1. 程式人生 > >Hadoop Streaming

Hadoop Streaming

earch IT fault target generate 完成 hadoop集群 問題 tor

原文地址:http://hadoop.apache.org/docs/r1.0.4/cn/streaming.html

  • Hadoop Streaming
  • Streaming工作原理
  • 將文件打包到提交的作業中
  • Streaming選項與用法
    • 只使用Mapper的作業
    • 為作業指定其他插件
    • Hadoop Streaming中的大文件和檔案
    • 為作業指定附加配置參數
    • 其他選項
  • 其他例子
    • 使用自定義的方法切分行來形成Key/Value對
    • 一個實用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項)
    • Hadoop聚合功能包的使用(-reduce aggregate 選項)
    • 字段的選取(類似於unix中的 ‘cut‘ 命令)
  • 常見問題
    • 我該怎樣使用Hadoop Streaming運行一組獨立(相關)的任務呢?
    • 如何處理多個文件,其中每個文件一個map?
    • 應該使用多少個reducer?
    • 如果在Shell腳本裏設置一個別名,並放在-mapper之後,Streaming會正常運行嗎? 例如,alias cl=‘cut -fl‘,-mapper "cl"會運行正常嗎?
    • 我可以使用UNIX pipes嗎?例如 –mapper "cut –fl | set s/foo/bar/g"管用麽?
    • 在streaming作業中用-file選項運行一個分布式的超大可執行文件(例如,3.6G)時, 我得到了一個錯誤信息“No space left on device”。如何解決?
    • 如何設置多個輸入目錄?
    • 如何生成gzip格式的輸出文件?
    • Streaming中如何自定義input/output format?
    • Streaming如何解析XML文檔?
    • 在streaming應用程序中如何更新計數器?
    • 如何更新streaming應用程序的狀態?

Hadoop Streaming

Hadoop streaming是Hadoop的一個工具, 它幫助用戶創建和運行一類特殊的map/reduce作業, 這些特殊的map/reduce作業是由一些可執行文件或腳本文件充當mapper或者reducer。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input myInputDirs     -output myOutputDir     -mapper /bin/cat     -reducer /bin/wc

Streaming工作原理

在上面的例子裏,mapper和reducer都是可執行文件,它們從標準輸入讀入數據(一行一行讀), 並把計算結果發給標準輸出。Streaming工具會創建一個Map/Reduce作業, 並把它發送給合適的集群,同時監視這個作業的整個執行過程。

如果一個可執行文件被用於mapper,則在mapper初始化時, 每一個mapper任務會把這個可執行文件作為一個單獨的進程啟動。 mapper任務運行時,它把輸入切分成行並把每一行提供給可執行文件進程的標準輸入。 同時,mapper收集可執行文件進程標準輸出的內容,並把收到的每一行內容轉化成key/value對,作為mapper的輸出。 默認情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。 如果沒有tab,整行作為key值,value值為null。不過,這可以定制,在下文中將會討論如何自定義key和value的切分方式。

如果一個可執行文件被用於reducer,每個reducer任務會把這個可執行文件作為一個單獨的進程啟動。 Reducer任務運行時,它把輸入切分成行並把每一行提供給可執行文件進程的標準輸入。 同時,reducer收集可執行文件進程標準輸出的內容,並把每一行內容轉化成key/value對,作為reducer的輸出。 默認情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。在下文中將會討論如何自定義key和value的切分方式。

這是Map/Reduce框架和streaming mapper/reducer之間的基本通信協議。

用戶也可以使用java類作為mapper或者reducer。上面的例子與這裏的代碼等價:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input myInputDirs     -output myOutputDir     -mapper org.apache.hadoop.mapred.lib.IdentityMapper     -reducer /bin/wc

用戶可以設定stream.non.zero.exit.is.failure true 或false 來表明streaming task的返回值非零時是 Failure 還是Success。默認情況,streaming task返回非零時表示失敗。

將文件打包到提交的作業中

任何可執行文件都可以被指定為mapper/reducer。這些可執行文件不需要事先存放在集群上; 如果在集群上還沒有,則需要用-file選項讓framework把可執行文件作為作業的一部分,一起打包提交。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input myInputDirs     -output myOutputDir     -mapper myPythonScript.py     -reducer /bin/wc     -file myPythonScript.py 

上面的例子描述了一個用戶把可執行python文件作為mapper。 其中的選項“-file myPythonScirpt.py”使可執行python文件作為作業提交的一部分被上傳到集群的機器上。

除了可執行文件外,其他mapper或reducer需要用到的輔助文件(比如字典,配置文件等)也可以用這種方式打包上傳。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input myInputDirs     -output myOutputDir     -mapper myPythonScript.py     -reducer /bin/wc     -file myPythonScript.py     -file myDictionary.txt

Streaming選項與用法

只使用Mapper的作業

有時只需要map函數處理輸入數據。這時只需把mapred.reduce.tasks設置為零,Map/reduce框架就不會創建reducer任務,mapper任務的輸出就是整個作業的最終輸出。

為了做到向下兼容,Hadoop Streaming也支持“-reduce None”選項,它與“-jobconf mapred.reduce.tasks=0”等價。

為作業指定其他插件

和其他普通的Map/Reduce作業一樣,用戶可以為streaming作業指定其他插件:

   -inputformat JavaClassName
   -outputformat JavaClassName
   -partitioner JavaClassName
   -combiner JavaClassName

用於處理輸入格式的類要能返回Text類型的key/value對。如果不指定輸入格式,則默認會使用TextInputFormat。 因為TextInputFormat得到的key值是LongWritable類型的(其實key值並不是輸入文件中的內容,而是value偏移量), 所以key會被丟棄,只把value用管道方式發給mapper。

用戶提供的定義輸出格式的類需要能夠處理Text類型的key/value對。如果不指定輸出格式,則默認會使用TextOutputFormat類。

Hadoop Streaming中的大文件和檔案

任務使用-cacheFile和-cacheArchive選項在集群中分發文件和檔案,選項的參數是用戶已上傳至HDFS的文件或檔案的URI。這些文件和檔案在不同的作業間緩存。用戶可以通過fs.default.name.config配置參數的值得到文件所在的host和fs_port。

這個是使用-cacheFile選項的例子:

-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink

在上面的例子裏,url中#後面的部分是建立在任務當前工作目錄下的符號鏈接的名字。這裏的任務的當前工作目錄下有一個“testlink”符號鏈接,它指向testfile.txt文件在本地的拷貝。如果有多個文件,選項可以寫成:

-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2

-cacheArchive選項用於把jar文件拷貝到任務當前工作目錄並自動把jar文件解壓縮。例如:

-cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3

在上面的例子中,testlink3是當前工作目錄下的符號鏈接,它指向testfile.jar解壓後的目錄。

下面是使用-cacheArchive選項的另一個例子。其中,input.txt文件有兩行內容,分別是兩個文件的名字:testlink/cache.txt和testlink/cache2.txt。“testlink”是指向檔案目錄(jar文件解壓後的目錄)的符號鏈接,這個目錄下有“cache.txt”和“cache2.txt”兩個文件。

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar                   -input "/user/me/samples/cachefile/input.txt"                    -mapper "xargs cat"                    -reducer "cat"                    -output "/user/me/samples/cachefile/out" \  
                  -cacheArchive ‘hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink‘ \  
                  -jobconf mapred.map.tasks=1                   -jobconf mapred.reduce.tasks=1 \ 
                  -jobconf mapred.job.name="Experiment"

$ ls test_jar/
cache.txt  cache2.txt

$ jar cvf cachedir.jar -C test_jar/ .
added manifest
adding: cache.txt(in = 30) (out= 29)(deflated 3%)
adding: cache2.txt(in = 37) (out= 35)(deflated 5%)

$ hadoop dfs -put cachedir.jar samples/cachefile

$ hadoop dfs -cat /user/me/samples/cachefile/input.txt
testlink/cache.txt
testlink/cache2.txt

$ cat test_jar/cache.txt 
This is just the cache string

$ cat test_jar/cache2.txt 
This is just the second cache string

$ hadoop dfs -ls /user/me/samples/cachefile/out      
Found 1 items
/user/me/samples/cachefile/out/part-00000  <r 3>   69

$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000
This is just the cache string   
This is just the second cache string

為作業指定附加配置參數

用戶可以使用“-jobconf <n>=<v>”增加一些配置變量。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input myInputDirs     -output myOutputDir     -mapper org.apache.hadoop.mapred.lib.IdentityMapper    -reducer /bin/wc     -jobconf mapred.reduce.tasks=2

上面的例子中,-jobconf mapred.reduce.tasks=2表明用兩個reducer完成作業。

關於jobconf參數的更多細節可以參考:hadoop-default.html

其他選項

Streaming 作業的其他選項如下表:

選項可選/必須描述
-cluster name 可選 在本地Hadoop集群與一個或多個遠程集群間切換
-dfs host:port or local 可選 覆蓋作業的HDFS配置
-jt host:port or local 可選 覆蓋作業的JobTracker配置
-additionalconfspec specfile 可選 用一個類似於hadoop-site.xml的XML文件保存所有配置,從而不需要用多個"-jobconf name=value"類型的選項單獨為每個配置變量賦值
-cmdenv name=value 可選 傳遞環境變量給streaming命令
-cacheFile fileNameURI 可選 指定一個上傳到HDFS的文件
-cacheArchive fileNameURI 可選 指定一個上傳到HDFS的jar文件,這個jar文件會被自動解壓縮到當前工作目錄下
-inputreader JavaClassName 可選 為了向下兼容:指定一個record reader類(而不是input format類)
-verbose 可選 詳細輸出

使用-cluster <name>實現“本地”Hadoop和一個或多個遠程Hadoop集群間切換。默認情況下,使用hadoop-default.xml和hadoop-site.xml;當使用-cluster <name>選項時,會使用$HADOOP_HOME/conf/hadoop-<name>.xml。

下面的選項改變temp目錄:

  -jobconf dfs.data.dir=/tmp

下面的選項指定其他本地temp目錄:

   -jobconf mapred.local.dir=/tmp/local
   -jobconf mapred.system.dir=/tmp/system
   -jobconf mapred.temp.dir=/tmp/temp

更多有關jobconf的細節請參考:http://wiki.apache.org/hadoop/JobConfFile

在streaming命令中設置環境變量:

-cmdenv EXAMPLE_DIR=/home/example/dictionaries/

其他例子

使用自定義的方法切分行來形成Key/Value對

之前已經提到,當Map/Reduce框架從mapper的標準輸入讀取一行時,它把這一行切分為key/value對。 在默認情況下,每行第一個tab符之前的部分作為key,之後的部分作為value(不包括tab符)。

但是,用戶可以自定義,可以指定分隔符是其他字符而不是默認的tab符,或者指定在第n(n>=1)個分割符處分割而不是默認的第一個。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input myInputDirs     -output myOutputDir     -mapper org.apache.hadoop.mapred.lib.IdentityMapper     -reducer org.apache.hadoop.mapred.lib.IdentityReducer     -jobconf stream.map.output.field.separator=.     -jobconf stream.num.map.output.key.fields=4 

在上面的例子,“-jobconf stream.map.output.field.separator=.”指定“.”作為map輸出內容的分隔符,並且從在第四個“.”之前的部分作為key,之後的部分作為value(不包括這第四個“.”)。 如果一行中的“.”少於四個,則整行的內容作為key,value設為空的Text對象(就像這樣創建了一個Text:new Text(""))。

同樣,用戶可以使用“-jobconf stream.reduce.output.field.separator=SEP”和“-jobconf stream.num.reduce.output.fields=NUM”來指定reduce輸出的行中,第幾個分隔符處分割key和value。

一個實用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項)

Hadoop有一個工具類org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, 它在應用程序中很有用。Map/reduce框架用這個類切分map的輸出, 切分是基於key值的前綴,而不是整個key。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input myInputDirs     -output myOutputDir     -mapper org.apache.hadoop.mapred.lib.IdentityMapper     -reducer org.apache.hadoop.mapred.lib.IdentityReducer     -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner     -jobconf stream.map.output.field.separator=.     -jobconf stream.num.map.output.key.fields=4     -jobconf map.output.key.field.separator=.     -jobconf num.key.fields.for.partition=2     -jobconf mapred.reduce.tasks=12

其中,-jobconf stream.map.output.field.separator=.-jobconf stream.num.map.output.key.fields=4是前文中的例子。Streaming用這兩個變量來得到mapper的key/value對。

上面的Map/Reduce 作業中map輸出的key一般是由“.”分割成的四塊。但是因為使用了 -jobconf num.key.fields.for.partition=2 選項,所以Map/Reduce框架使用key的前兩塊來切分map的輸出。其中, -jobconf map.output.key.field.separator=. 指定了這次切分使用的key的分隔符。這樣可以保證在所有key/value對中, key值前兩個塊值相同的所有key被分到一組,分配給一個reducer。

這種高效的方法等價於指定前兩塊作為主鍵,後兩塊作為副鍵。 主鍵用於切分塊,主鍵和副鍵的組合用於排序。一個簡單的示例如下:

Map的輸出(key)

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

切分給3個reducer(前兩塊的值用於切分)

11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2

在每個切分後的組內排序(四個塊的值都用於排序)

11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3

Hadoop聚合功能包的使用(-reduce aggregate 選項)

Hadoop有一個工具包“Aggregate”( https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate)。 “Aggregate”提供一個特殊的reducer類和一個特殊的combiner類, 並且有一系列的“聚合器”(“aggregator”)(例如“sum”,“max”,“min”等)用於聚合一組value的序列。 用戶可以使用Aggregate定義一個mapper插件類, 這個類用於為mapper輸入的每個key/value對產生“可聚合項”。 combiner/reducer利用適當的聚合器聚合這些可聚合項。

要使用Aggregate,只需指定“-reducer aggregate”:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input myInputDirs     -output myOutputDir     -mapper myAggregatorForKeyCount.py     -reducer aggregate     -file myAggregatorForKeyCount.py     -jobconf mapred.reduce.tasks=12

python程序myAggregatorForKeyCount.py例子:

#!/usr/bin/python

import sys;

def generateLongCountToken(id):
    return "LongValueSum:" + id + "\t" + "1"

def main(argv):
    line = sys.stdin.readline();
    try:
        while line:
            line = line[:-1];
            fields = line.split("\t");
            print generateLongCountToken(fields[0]);
            line = sys.stdin.readline();
    except "end of file":
        return None
if __name__ == "__main__":
     main(sys.argv)

字段的選取(類似於unix中的 ‘cut‘ 命令)

Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduce幫助用戶高效處理文本數據, 就像unix中的“cut”工具。工具類中的map函數把輸入的key/value對看作字段的列表。 用戶可以指定字段的分隔符(默認是tab), 可以選擇字段列表中任意一段(由列表中一個或多個字段組成)作為map輸出的key或者value。 同樣,工具類中的reduce函數也把輸入的key/value對看作字段的列表,用戶可以選取任意一段作為reduce輸出的key或value。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input myInputDirs     -output myOutputDir     -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce    -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner     -jobconf map.output.key.field.separa=.     -jobconf num.key.fields.for.partition=2     -jobconf mapred.data.field.separator=.     -jobconf map.output.key.value.fields.spec=6,5,1-3:0-     -jobconf reduce.output.key.value.fields.spec=0-2:5-     -jobconf mapred.reduce.tasks=12

選項“-jobconf map.output.key.value.fields.spec=6,5,1-3:0-”指定了如何為map的輸出選取key和value。Key選取規則和value選取規則由“:”分割。 在這個例子中,map輸出的key由字段6,5,1,2和3組成。輸出的value由所有字段組成(“0-”指字段0以及之後所有字段)。

選項“-jobconf reduce.output.key.value.fields.spec=0-2:0-”(譯者註:此處應為”0-2:5-“)指定如何為reduce的輸出選取value。 本例中,reduce的輸出的key將包含字段0,1,2(對應於原始的字段6,5,1)。 reduce輸出的value將包含起自字段5的所有字段(對應於所有的原始字段)。

常見問題

我該怎樣使用Hadoop Streaming運行一組獨立(相關)的任務呢?

多數情況下,你不需要Map Reduce的全部功能, 而只需要運行同一程序的多個實例,或者使用不同數據,或者在相同數據上使用不同的參數。 你可以通過Hadoop Streaming來實現。

如何處理多個文件,其中每個文件一個map?

例如這樣一個問題,在集群上壓縮(zipping)一些文件,你可以使用以下幾種方法:

  1. 使用Hadoop Streaming和用戶編寫的mapper腳本程序:
    • 生成一個文件,文件中包含所有要壓縮的文件在HDFS上的完整路徑。每個map 任務獲得一個路徑名作為輸入。
    • 創建一個mapper腳本程序,實現如下功能:獲得文件名,把該文件拷貝到本地,壓縮該文件並把它發到期望的輸出目錄。
  2. 使用現有的Hadoop框架:
    • 在main函數中添加如下命令:
             FileOutputFormat.setCompressOutput(conf, true);
             FileOutputFormat.setOutputCompressorClass(conf, org.apache.hadoop.io.compress.GzipCodec.class);
             conf.setOutputFormat(NonSplitableTextInputFormat.class);
             conf.setNumReduceTasks(0);
      
    • 編寫map函數:
             public void map(WritableComparable key, Writable value, 
                                     OutputCollector output, 
                                     Reporter reporter) throws IOException {
                  output.collect((Text)value, null);
             }
      
    • 註意輸出的文件名和原文件名不同

應該使用多少個reducer?

請參考Hadoop Wiki:Reducer

如果在Shell腳本裏設置一個別名,並放在-mapper之後,Streaming會正常運行嗎? 例如,alias cl=‘cut -fl‘,-mapper "cl"會運行正常嗎?

腳本裏無法使用別名,但是允許變量替換,例如:

$ hadoop dfs -cat samples/student_marks
alice   50
bruce   70
charlie 80
dan     75

$ c2=‘cut -f2‘; $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar     -input /user/me/samples/student_marks 
    -mapper \"$c2\" -reducer ‘cat‘  
    -output /user/me/samples/student_out 
    -jobconf mapred.job.name=‘Experiment‘

$ hadoop dfs -ls samples/student_out
Found 1 items/user/me/samples/student_out/part-00000    <r 3>   16

$ hadoop dfs -cat samples/student_out/part-00000
50
70
75
80

我可以使用UNIX pipes嗎?例如 –mapper "cut –fl | set s/foo/bar/g"管用麽?

現在不支持,而且會給出錯誤信息“java.io.IOException: Broken pipe”。這或許是一個bug,需要進一步研究。

在streaming作業中用-file選項運行一個分布式的超大可執行文件(例如,3.6G)時, 我得到了一個錯誤信息“No space left on device”。如何解決?

配置變量stream.tmpdir指定了一個目錄,在這個目錄下要進行打jar包的操作。stream.tmpdir的默認值是/tmp,你需要將這個值設置為一個有更大空間的目錄:

-jobconf stream.tmpdir=/export/bigspace/...

如何設置多個輸入目錄?

可以使用多個-input選項設置多個輸入目錄:

 hadoop jar hadoop-streaming.jar -input ‘/user/foo/dir1‘ -input ‘/user/foo/dir2‘ 

如何生成gzip格式的輸出文件?

除了純文本格式的輸出,你還可以生成gzip文件格式的輸出,你只需設置streaming作業中的選項‘-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode’。

Streaming中如何自定義input/output format?

至少在Hadoop 0.14版本以前,不支持多個jar文件。所以當指定自定義的類時,你要把他們和原有的streaming jar打包在一起,並用這個自定義的jar包替換默認的hadoop streaming jar包。

Streaming如何解析XML文檔?

你可以使用StreamXmlRecordReader來解析XML文檔。

hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)

Map任務會把BEGIN_STRING和END_STRING之間的部分看作一條記錄。

在streaming應用程序中如何更新計數器?

streaming進程能夠使用stderr發出計數器信息。 reporter:counter:<group>,<counter>,<amount> 應該被發送到stderr來更新計數器。

如何更新streaming應用程序的狀態?

streaming進程能夠使用stderr發出狀態信息。 reporter:status:<message> 要被發送到stderr來設置狀態。

Hadoop Streaming