使用python寫一個最基本的mapreduce程序
阿新 • • 發佈:2019-03-15
sheng words == reducer logs 註意 例子 one split
一個mapreduce程序大致分成三個部分,第一部分是mapper文件,第二個就是reducer文件,第三部分就是使用hadoop command 執行程序。
在這個過程中,困惑我最久的一個問題就是在hadoop command中hadoop-streaming 也就是streaming jar包的路徑。
路徑大概是這樣的:
cd ~
cd /usr/local/hadoop-2.7.3/share/hadoop/tools/lib
#在這個文件下,我們可以找到你 hadoop-streaming-2.7.3.jar
這個路徑是參考的這裏
這個最基本的mapreduce程序我主要參考了三個博客:
第一個-主要是參考這個博客的mapper和reducer的寫法-在這個博客中它在練習中給出了只寫mapper執行文件的一個例子
第二個博客-主要參考的這個博客的runsh的寫法
第三個博客-主要是參考這個博客的將本地文件上傳到hdfs文件系統中
首先對於mapper文件
mapper.py
#!/usr/bin/env python import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1) #上面這個文件我們得到的結果大概是每個單詞對應一個數字1
對於reducer文件:reducer.py
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count)
對上面兩個代碼先進行一個本地的檢測
vim test.txt
foo foo quux labs foo bar quux
cat test.txt|python mapper.py
cat test.txt|python mapper.py|sort|python reducer.py
##註意在這裏我們執行萬mapper之後我們進行了一個排序,所以對於相同單詞是處於相鄰位置的,這樣在執行reducer文件的時候代碼可以寫的比較簡單一點
然後在hadoop集群中跑這個代碼
首先講這個test.txt 上傳到相應的hdfs文件系統中,使用的命令模式如下:
hadoop fs -put ./test.txt /dw_ext/weibo_bigdata_ugrowth/mds/
然後寫一個run.sh
HADOOP_CMD="/usr/local/hadoop-2.7.3/bin/hadoop" # hadoop的bin的路徑
STREAM_JAR_PATH="/usr/local/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar" ## streaming jar包的路徑
INPUT_FILE_PATH="/dw_ext/weibo_bigdata_ugrowth/mds/src.txt" #hadoop集群上的資源輸入路徑
#需要註意的是intput文件必須是在hadooop集群上的hdfs文件中的,所以必須將本地文件上傳到集群上
OUTPUT_PATH="/dw_ext/weibo_bigdata_ugrowth/mds/output"
#需要註意的是這output文件必須是不存在的目錄,因為我已經執行過一次了,所以這裏我把這個目錄通過下面的代碼刪掉
$HADOOP_CMD fs -rmr $OUTPUT_PATH
$HADOOP_CMD jar $STREAM_JAR_PATH -input $INPUT_FILE_PATH -output $OUTPUT_PATH -mapper "python mapper.py" -reducer "python reducer.py" -file ./mapper.py -file ./reducer.py
# -mapper:用戶自己寫的mapper程序,可以是可執行文件或者腳本
# -reducer:用戶自己寫的reducer程序,可以是可執行文件或者腳本
# -file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
明天看這個
https://www.cnblogs.com/shay-zhangjin/p/7714868.html
https://www.cnblogs.com/kaituorensheng/p/3826114.html
使用python寫一個最基本的mapreduce程序