Hadoop Streaming的使用
資料來源:https://zhuanlan.zhihu.com/p/34903460
重點記錄:
mapper的角色:hadoop將使用者提交的mapper可執行程式或指令碼作為一個單獨的程序載入起來,這個程序我們稱之為mapper程序,hadoop不斷地將檔案片段轉換為行,傳遞到我們的mapper程序中,mapper程序通過標準輸入的方式一行一行地獲取這些資料,然後設法將其轉換為鍵值對,再通過標準輸出的形式將這些鍵值對按照一對兒一行的方式輸出出去。
雖然在我們的mapper函式中,我們自己能分得清key/value(比方說有可能在我們的程式碼中使用的是string key,int value),但是當我們採用標準輸出之後,key value是列印到一行作為結果輸出的(比如sys.stdout.write("%s\t%s\n"%(birthyear,gender))),因此我們為了保證hadoop能從中鑑別出我們的鍵值對,鍵值對中一定要以分隔符'\t'即Tab(也可自定義分隔符)字元分隔,這樣才能保證hadoop正確地為我們進行partitoner、shuffle等等過程。
reducer的角色:hadoop將使用者提交的reducer可執行程式或指令碼同樣作為一個單獨的程序載入起來,這個程序我們稱之為reducer程序,hadoop不斷地將鍵值對(按鍵排序)按照一對兒一行的方式傳遞到reducer程序中,reducer程序同樣通過標準輸入的方式按行獲取這些鍵值對兒,進行自定義計算後將結果通過標準輸出的形式輸出出去。
在reducer這個過程中需要注意的是:傳遞進reducer的鍵值對是按照鍵排過序的,這點是由MR框架的sort過程保證的,因此如果讀到一個鍵與前一個鍵不同,我們就可以知道當前key對應的pairs已經結束了,接下來將是新的key對應的pairs。
Hadoop Streaming的使用方式
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/.../hadoop-streaming.jar [genericOptions] [streamingOptions]
在這行命令中,其實是有先後順序的,我們一定要保證[genericOptions]寫在[streamingOptions]之前,否則hadoop streaming命令將失效。
按照以上的格式使用該命令需要在安裝hadoop時設定好環境變數HADOOP_HOME,將其值設定為hadoop的安裝目錄,當然如果覺得以上的呼叫方式還是麻煩的話,也可以把hadoop設定進系統的PATH環境變數並用hadoop jar $HADOOP_HOME/.../hadoop-streaming.jar [genericOptions] [streamingOptions]的格式呼叫。
Generic options supported are
- -conf <configuration file> specify an application configuration file
- -D <property=value> use value for given property
- -fs <local|namenode:port> specify a namenode
- -jt <local|jobtracker:port> specify a job tracker
- -files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
- -libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
- -archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
注意:由於版本的原因,files經常被寫成CacheFile,D經常被寫成jobconf,archives經常被寫成cacheArchive。
除了[Generic option]剩餘的就是streamingOptions
在叢集上執行MapReduce程式
使用hadoop streaming命令來在叢集上執行程式時,請注意一定要將-D,-files引數放在所有引數前面,我們之前說過genericOptions一定要放在streamingOptions前面,而-files -D都屬於genericOptions,所以你懂的,之前曾把-files放在後面,執行起來後各種錯誤。另外不要忘了使用-D stream.non.zero.exit.is.failure=false 來避免MR未返回0時異常退出,原因是:-D stream.non.zero.exit.is.failure=false/true 指定當mapper和reducer未返回0時,hadoop是否該認為此任務執行失敗。預設為true。當mapper和reducer的返回值不是0或沒有返回值時,hadoop將認為該任務為異常任務,將被再次執行,預設嘗試4次都不是0,整個job都將失敗。因此,如果我們在編寫mapper和reducer未返回0時,則應該將該引數設定為false,否則hadoop streaming任務將報出異常。
執行程式碼例項如下所示:
hadoop jar /modules/hadoop-2.6.0-cdh5.11.1/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.11.1.jar \
> -D stream.non.zero.exit.is.failure=false \
> -files /home/sunyunfeng/jars/mapper.py,/home/sunyunfeng/jars/reducer.py \
> -input /user/sunyunfeng/sampleTest.csv \
> -output /user/sunyunfeng/output \
> -mapper "python mapper.py" \
> -reducer "python reducer.py"