Hadoop streaming详细介绍
作者:Hadoop streaming 发布时间:2023-10-12 04:43:50
Hadoop streaming
Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java。这里要介绍的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作为我们mapreduce程序和MapReduce框架之间的接口。所以你可以用任何语言来编写MapReduce程序,只要该语言可以往standard input/output上进行读写。
streamming是天然适用于文字处理的(text processing),当然,也仅适用纯文本的处理,对于需要对象和序列化的场景,hadoop streaming无能为力。它力图使我们能够快捷的通过各种脚本语言,快速的处理大量的文本文件。以下是steaming的一些特点:
Map函数的输入是通过stand input一行一行的接收数据的。(不像Java API,通过InputFormat类做预处理,使得Map函数的输入是有Key和value的)
Map函数的output则必须限定为key-value pair,key和value之间用\t分开。(MapReduce框架在处理intermediate的Map输出时,必须做sort和partition,即shuffle)
Reduce函数的input是Map函数的output也是key-value pair,key和value之间用\t分开。
常用的Streaming编程语言:
bash shell
ruby
python
Ruby
下面是一个Ruby编写的MapReduce程序的示例:
map
max_temperature_map.rb:
ruby
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
从标准输入读入一行data。
处理数据之后,生成一个键值对,用\t分隔,输出到标准输出
reduce
max_temperature_reduce.rb:
ruby
#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
从标准输入读入一行数据
数据是用\t分隔的键值对
数据是被MapReduce根据key排序之后顺序一行一行读入
reduce函数对数据进行处理,并输出,输出仍是用\t分隔的键值对
运行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.rb \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar指明了使用hadoop streaming
hadoop-*-streaming.jar会将input里的文件,一行一行的输出到标准输出。
用-mapper指定Map函数。类似于通过管道将数据传给rb文件: data|ch02/src/main/ruby/max_temperature_map.rb
-reducer指定Reduce函数。
Python
Map
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)
Reduce
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
运行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.py\
-reducer ch02/src/main/ruby/max_temperature_reduce.py
Bash shell
Map
#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI
read offset s3file
# Retrieve file from S3 to local disk
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_INSTALL/bin/hadoop fs -get $s3file .
# Un-bzip and un-tar the local file
target=`basename $s3file .tar.bz2`
mkdir -p $target
echo "reporter:status:Un-tarring $s3file to $target" >&2
tar jxf `basename $s3file` -C $target
# Un-gzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*/*
do
gunzip -c $file >> $target.all
echo "reporter:status:Processed $file" >&2
done
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz
运行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-D mapred.reduce.tasks=0 \
-D mapred.map.tasks.speculative.execution=false \
-D mapred.task.timeout=12000000 \
-input ncdc_files.txt \
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
-output output \
-mapper load_ncdc_map.sh \
-file load_ncdc_map.sh
这里的-D mapred.reduce.tasks=0将reduce task观掉,因此也不需要设置-reducer
只使用Mapper,可以通过MapReduce帮助我们并行的完成一些平时只能串行的shell脚本
注意这里的-file,在集群模式下,需要并行运行时,需要-file把文件传输到其他节点
Combiner
在streaming模式下,仍然可以运行Combiner,两种方法:
通过Java编写一个combiner的函数,并使用-combiner option
以命令行的管道模式完成combiner的任务
这里具体解释第二种方法:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/all \
-output output \
-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |
ch02/src/main/ruby/max_temperature_reduce.rb" \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb \
-file ch02/src/main/ruby/max_temperature_map.rb \
-file ch02/src/main/ruby/max_temperature_reduce.rb
注意看-mapper这一行,通关管道的方式,把mapper的临时输出文件(intermediate file,Map完成后的临时文件)作为输入,送到sort进行排序,然后送到reduce脚本,来完成类似于combiner的工作。这时候的输出才真正的作为shuffle的输入,被分组并在网络上发送到Reduce
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
猜你喜欢
- (4)"虚拟系统"中的"/dev"与"/devices"目录作"/de
- 检测远程端口是否打开常用telnet 110.101.101.101 80方式测试远程主机端口是否打开。 除此之外还可
- 这本小册子薄得让人有点为难,但是这已经有了足够的信息量让所有人来重新审视一个产业,没错,从今天开始,SEM 这个产业可以划分为「之前」和「以
- 实现方法: 第一步:channelunit.func.php中添加如下函数 //参数说明:第1个参数是从信息表里读取出来的类别ID,第2个参
- 1、Linux是一个多用户的操作系统。每个用户登录系统后,都会有一个专用的运行环境。通常每个用户默认的环境都是相同的,这个默认环境实际上就是
- ASP 的新增功能除了内部性能增强和优化外,该版本的 Active Server Pages(ASP) 还具有如下新功能:更好的国际化和 U
- 2008 年 8 月的最后一周,我们将暂停 AdSense 推介计划。这可能会给正在投放推介广告的发布商带来一些不便,我们在这里表示歉意。G
- 这几天,公司终于放假了,终于可以安下心来好好学些proftp+mysql+quota。 安装proftp之前,必须先做一个工作,假如你的my
- 如果你负担不起付费主机,那免费使用的虚拟主机往往是最好的选择。但在你决定使用免费主机以前有几点是你必须要注意的,最重要的是你不能期望和要求免
- 很多朋友问,seo要选取什么关键字才能达到效果,以下发表一下我的看法。一、不要太相信top.baidu.com那些关键字是热门,但也是个个人
- 对于中小型企业来说,一些资源需要在局域网中共享。可如果用户找一个文件,需要到不同的机器里去查找,这样不但耽误时间,而且会大大降低员工们的工作
- 我写这片文章只是想让你明白深刻理解某一协议的好处。高手免看。如果有人利用这片文章所做的一切事情,盖不负责。网上关于ARP的资料已经很多了,就
- 在Windows IIS 6.0下配置PHP,通常有CGI、ISAPI和FastCGI三种配置方式,这三种模式都可以在IIS 6.0下成功运
- WP-PageNavi一款是由 Lester “GaMerZ” Chan 开发的文章分页插件, 给你的WordPress博客主题页面导航来个
- 2008年2月27日,Google PankRank进行了2008年的首次更新,我的几个站点没有什么变化。另外,昨天在群里看到一个人叫卖PR
- “忽如一夜春风来,千树万树梨花开”,这句诗跟现今网页游戏遍地开花的景象甚是贴合。据不完全数据统计,目前我国有近300多家互联网公司在开发以及
- 百度联盟网盟推广图文混排产品形式上线,全新产品形式将助您提升收益!图文混排是主题描述产品在文字、图片形式之外新增加的一种推广形式。它是一种除
- 最近单位局域网电脑上网,大多数网站页面显示都会异常,卡巴在访问网站的时候会提示“"木马程序Trojan-Downloader.JS
- 相信读者都知道,Google对域名信任度要求越来越高,甚至高到了过分的地步。不过抱怨是没有用的,必须面对现实。从总体上来说,提高域名信任度的
- 一、工具的使用1、学会使用vim/emacs,vim/emacs是linux下最常用的源码编辑具,不光要学会用它们编辑源码,还要学会用它们进