1. 程式人生 > >Hadoop Streaming入門

Hadoop Streaming入門

               

說明:本文使用的Hadoop版本是2.6.0,示例語言用Python。

概述

Hadoop Streaming是Hadoop提供的一種程式設計工具,提供了一種非常靈活的程式設計介面, 允許使用者使用任何語言編寫MapReduce作業,是一種常用的非Java API編寫MapReduce的工具。

呼叫Streaming的命令如下(hadoop-streaming-x.x.jar不同版本的位置不同):

$ ${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \    -input <輸入目錄> \ # 可以指定多個輸入路徑,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'    -inputformat <輸入格式 JavaClassName> \    -output <輸出目錄> \    -outputformat <輸出格式 JavaClassName> \
    -mapper <mapper executable or JavaClassName> \    -reducer <reducer executable or JavaClassName> \    -combiner <combiner executable or JavaClassName> \    -partitioner <JavaClassName> \    -cmdenv <name
=value> \ # 可以傳遞環境變數,可以當作引數傳入到任務中,可以配置多個    -file <依賴的檔案> \ # 配置檔案,字典等依賴    -D <name=value> \ # 作業的屬性配置

注意:-file是一個deprecated的配置,可以使用-files。

常見的作業屬性

屬性 新名稱 含義 備註
mapred.job.name mapreduce.job.name 作業名稱  
mapred.map.tasks mapreduce.job.maps 每個Job執行map task的數量 map啟動的個數無法被完全控制
mapred.reduce.tasks mapreduce.job.reduces 每個Job執行reduce task的數量  
mapred.job.priority mapreduce.job.priority 作業優先順序 VERY_LOW,LOW,NORMAL,HIGH,VERY_HIGH
stream.map.input.field.separator   Map輸入資料的分隔符 預設是\t
stream.reduce.input.field.separator   Reduce輸入資料的分隔符 預設是\t
stream.map.output.field.separator   Map輸出資料的分隔符 預設是\t
stream.reduce.output.field.separator   Reduce輸出資料的分隔符  
stream.num.map.output.key.fields   Map task輸出record中key所佔的個數  
stream.num.reduce.output.key.fields   Reduce task輸出record中key所佔的個數  

注意:2.6.0的Streaming文件中只提到了stream.num.reduce.output.fields, 沒提到stream.num.reduce.output.key.fields,後續需要看下二者的關係。

stream開頭的是streaming特有的,mapred.map.tasks和mapred.reduce.tasks是通用的

基本原理

Hadoop Streaming要求使用者編寫的Mapper/Reducer從標準輸入(stdin)中讀取資料,將結果寫入到標準輸出(stdout)中, 這非常類似於Linux的管道機制。

正因此,我們在linux本地方便對Streaming的MapReduce進行測試

$ cat <input_file> | <mapper executable> | sort | <reducer executable># python的streaming示例$ cat <input_file> | python mapper.py | sort | python reducer.py

WordCount示例

準備資料

自行替換其中的<username>

$ cat input/input_0.txtHadoop is the Elephant King!A yellow and elegant thing.He never forgetsUseful data, or letsAn extraneous element cling!$ cat input/input_1.txt  A wonderful king is Hadoop.The elephant plays well with Sqoop.But what helps him to thriveAre Impala, and Hive,And HDFS in the group.$ cat input/input_2.txt  Hadoop is an elegant fellow.An elephant gentle and mellow.He never gets mad,Or does anything bad,Because, at his core, he is yellow.$ ${HADOOP_HOME}/bin/hadoop fs -mkdir -p /user/<username>/wordcount$ ${HADOOP_HOME}/bin/hadoop fs -put input/ /user/<username>/wordcount

編寫Mapper

#!/bin/env python# encoding: utf-8import reimport sysseperator_pattern = re.compile(r'[^a-zA-Z0-9]+')for line in sys.stdin:    for word in seperator_pattern.split(line):        if word:            print '%s\t%d' % (word.lower(), 1)

編寫Reducer

#!/bin/env python# encoding: utf-8import syslast_key = Nonelast_sum = 0for line in sys.stdin:    key, value = line.rstrip('\n').split('\t')    if last_key is None:        last_key = key        last_sum = int(value)    elif last_key == key:        last_sum += int(value)    else:        print '%s\t%d' % (last_key, last_sum)        last_sum = int(value)        last_key = keyif last_key:    print '%s\t%d' % (last_key, last_sum)

使用itertools.groupby的Reducer

#!/bin/env python# encoding: utf-8import itertoolsimport sysstdin_generator = (line for line in sys.stdin if line)for key, values in itertools.groupby(stdin_generator, key=lambda x: x.split('\t')[0]):    value_sum = sum((int(i.split('\t')[1]) for i in values))    print '%s\t%d' % (key, value_sum)

示例程式碼太過簡單,應該包含更多的異常處理,否則會導致程式異常退出的。

除錯方法

本地測試

前面說過,Streaming的基本過程與linux管道類似,所以可以在本地先進行簡單的測試。 這裡的測試只能測試程式的邏輯基本符合預期,作業的屬性設定

$ cat input/* | python mapper.py  | sort | python reducer.pya       2an      3and     4anything        1are     1at      1bad     1because 1but     1cling   1core    1data    1does    1elegant 2element 1elephant        3extraneous      1fellow  1forgets 1gentle  1gets    1group   1hadoop  3hdfs    1he      3helps   1him     1his     1hive    1impala  1in      1is      4king    2lets    1mad     1mellow  1never   2or      2plays   1sqoop   1the     3thing   1thrive  1to      1useful  1well    1what    1with    1wonderful       1yellow  2

使用Counter

在mapper中新增統計切詞後為空的個數

#!/bin/env python# encoding: utf-8import reimport sysseperator_pattern = re.compile(r'[^a-zA-Z0-9]+')def print_counter(group, counter, amount):    print >> sys.stderr, 'reporter:counter:{g},{c},{a}'.format(g=group, c=counter, a=amount)for line in sys.stdin:    for word in seperator_pattern.split(line):        if word:            print '%s\t%d' % (word.lower(), 1)        else:            print_counter('wc', 'empty-word', 1)

Streaming文件中描述列印counter的方法:

How do I update counters in streaming applications?

A streaming process can use the stderr to emit counter information. reporter:counter:<group>,<counter>,<amount> should be sent to stderr to update the counter.

就是向stderr中列印reporter:counter:<group>,<counter>,<amount>的字串就可以更新counter資訊了,非常簡單有用的一個工具,對於job的除錯和監控非常有幫助。

在叢集上執行(reducer個數設定為3)

# 使用-files,注意:-D -files選項放在最前面,放在後面會報錯,不懂為何$ ${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \    -D mapred.job.name="streaming_wordcount" \    -D mapred.map.tasks=3 \    -D mapred.reduce.tasks=3 \    -D mapred.job.priority=HIGH \    -files "mapper.py,reducer.py" \    -input /user/<username>/wordcount/input \    -output /user/<username>/wordcount/output \    -mapper "python mapper.py" \    -reducer "python reducer.py"# output 不同的版本可能輸出有所不同 -D這裡使用的老配置名,前面會有一些警告,這裡未顯示packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-unjar707084306300214621/] [] /tmp/streamjob5287904745550112970.jar tmpDir=null15/09/29 10:35:14 INFO client.RMProxy: Connecting to ResourceManager at xxxxx/x.x.x.x:y15/09/29 10:35:14 INFO client.RMProxy: Connecting to ResourceManager at xxxxx/x.x.x.x:y15/09/29 10:35:15 INFO mapred.FileInputFormat: Total input paths to process : 315/09/29 10:35:15 INFO mapreduce.JobSubmitter: number of splits:315/09/29 10:35:15 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces15/09/29 10:35:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1440570785607_159715/09/29 10:35:15 INFO impl.YarnClientImpl: Submitted application application_1440570785607_159715/09/29 10:35:15 INFO mapreduce.Job: The url to track the job: http://xxxxx:yyy/proxy/application_1440570785607_1597/15/09/29 10:35:15 INFO mapreduce.Job: Running job: job_1440570785607_159715/09/29 10:37:15 INFO mapreduce.Job: Job job_1440570785607_1597 running in uber mode : false15/09/29 10:37:15 INFO mapreduce.Job:  map 0% reduce 0%15/09/29 10:42:17 INFO mapreduce.Job:  map 33% reduce 0%15/09/29 10:42:18 INFO mapreduce.Job:  map 100% reduce 0%15/09/29 10:42:23 INFO mapreduce.Job:  map 100% reduce 100%15/09/29 10:42:24 INFO mapreduce.Job: Job job_1440570785607_1597 completed successfully15/09/29 10:42:24 INFO mapreduce.Job: Counters: 50        File System Counters                FILE: Number of bytes read=689                FILE: Number of bytes written=661855                FILE: Number of read operations=0                FILE: Number of large read operations=0                FILE: Number of write operations=0                HDFS: Number of bytes read=822                HDFS: Number of bytes written=379                HDFS: Number of read operations=18                HDFS: Number of large read operations=0                HDFS: Number of write operations=6        Job Counters                Launched map tasks=3                Launched reduce tasks=3                Rack-local map tasks=3                Total time spent by all maps in occupied slots (ms)=10657                Total time spent by all reduces in occupied slots (ms)=21644                Total time spent by all map tasks (ms)=10657                Total time spent by all reduce tasks (ms)=10822                Total vcore-seconds taken by all map tasks=10657                Total vcore-seconds taken by all reduce tasks=10822                Total megabyte-seconds taken by all map tasks=43651072                Total megabyte-seconds taken by all reduce tasks=88653824        Map-Reduce Framework                Map input records=15                Map output records=72                Map output bytes=527                Map output materialized bytes=725                Input split bytes=423                Combine input records=0                Combine output records=0                Reduce input groups=50                Reduce shuffle bytes=725                Reduce input records=72                Reduce output records=50                Spilled Records=144                Shuffled Maps =9                Failed Shuffles=0                Merged Map outputs=9                GC time elapsed (ms)=72                CPU time spent (ms)=7870                Physical memory (bytes) snapshot=3582062592                Virtual memory (bytes) snapshot=29715922944                Total committed heap usage (bytes)=10709630976        Shuffle Errors                BAD_ID=0                CONNECTION=0                IO_ERROR=0                WRONG_LENGTH=0                WRONG_MAP=0                WRONG_REDUCE=0        File Input Format Counters                Bytes Read=399        File Output Format Counters                Bytes Written=379        wc                empty-word=1515/09/29 10:42:24 INFO streaming.StreamJob: Output directory: /user/<username>/wordcount/output

命令輸出的需要關注的幾個地方

  1. The url to track the job: http://xxxxx:yyy/proxy/application_1440570785607_1597/ 點選這個url可以通過web頁面檢視任務的狀態
  2. map 0% reduce 0% 顯示任務map和reduce的進度
  3. 最後的Counters資訊,包含系統預設的counter,可以自定義counter來統計一些任務的狀態資訊
  4. Output directory: /user//wordcount/output 結果輸出目錄

常見問題和解決方法

叢集Python環境的問題

使用Archive來上傳一份Python的二進位制環境

$ wget https://www.python.org/ftp/python/2.7.10/Python-2.7.10.tgz$ tar xzf Python-2.7.10.tgz$ cd Python-2.7.10# compile$ ./configure --prefix=/home/<username>/wordcount/python27$ make -j$ make install# 打包一份python27.tar.gz$ cd /home/<username>/wordcount/$ tar czf python27.tar.gz python27/# 上傳至hadoop的hdfs$ ${HADOOP_HOME}/bin/hadoop fs -mkdir -p /tools/$ ${HADOOP_HOME}/bin/hadoop fs -put python27.tar.gz /tools# 啟動任務,使用剛才上傳的Python版本$ ${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \    -D mapred.reduce.tasks=3 \    -files "mapper.py,reducer.py" \    -archives "hdfs://xxxxx:9000/tools/python27.tar.gz#py" \    -input /user/<username>/wordcount/input \    -output /user/<username>/wordcount/output \    -mapper "py/python27/bin/python mapper.py" \    -reducer "py/python27/bin/python reducer.py"

Reduce多路輸出

有時候我們的MapReduce程式的輸出希望是輸出兩份不同的資料,這種情況下可以使用多路輸出。

舊版本使用的是outputformat,org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat和org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat是支援多路輸出的,輸出的格式是由原來的變成,suffix是A-Z,如果為其他會報錯,不同suffix代表不同的輸出,支援26路輸出。最終的輸出檔案會有part-xxxx-A和part-xxxx-B等,與不同的suffix相對應。

新版本只剩下MultipleOutputs,我暫時未找到在Streaming中使用的方法。

Map多路輸入

配置多個-input的時候可以進行多路輸入,在實際中可能需要對不同的輸入進行不同的處理,這個時候需要獲取輸入的路徑資訊, 來區分是哪個輸入路徑或檔案。Streaming提供了Configured_Parameters, 可以獲取一些執行時的資訊。

Name Type Description
mapreduce.job.id String The job id
mapreduce.job.jar String job.jar location in job directory
mapreduce.job.local.dir String The job specific shared scratch space
mapreduce.task.id String The task id
mapreduce.task.attempt.id String The task attempt id
mapreduce.task.is.map boolean Is this a map task
mapreduce.task.partition int The id of the task within the job
mapreduce.map.input.file String The filename that the map is reading from
mapreduce.map.input.start long The offset of the start of the map input split
mapreduce.map.input.length long The number of bytes in the map input split
mapreduce.task.output.dir String The task's temporary output directory

在Streaming job執行的過程中,這些mapreduce的引數格式會有所變化,所有的點(.)會變成下劃線(_)。例如,mapreduce.job.id變成mapreduce_job_id。 所有的引數都可以通過環境變數來獲取。

回到上面的問題,可以通過mapreduce.map.input.file來獲取輸入的路徑名稱。

import osinput_file = os.environ['mapreduce_map_input_file']

其他

Python對streaming的封裝的類庫

  1. mrjob

Hadoop周邊的類庫

  1. snakebite:純Python實現的HDFS客戶端

參考

  1. Apache Hadoop MapReduce Streaming
  2. Hadoop Streaming 程式設計 - 董西成
  3. Deprecated Properties: 新舊引數名字對照

轉載地址:http://icejoywoo.github.io/hadoop/2015/09/28/introduction-to-hadoop-streaming.html#reduce多路輸出            

再分享一下我老師大神的人工智慧教程吧。零基礎!通俗易懂!風趣幽默!還帶黃段子!希望你也加入到我們人工智慧的隊伍中來!https://blog.csdn.net/jiangjunshow