Python編寫Hadoop MapReduce程式
adoop 的 MapReduce 程式,使用的是 Java ,但是使用 Java 很明顯的一個弊端就是每次都要編碼、打包、上傳、執行,還真心是麻煩,想要更加簡單的使用 Hadoop 的運算能力,想要寫 MapReduce程式不那麼複雜。還真是個問題。
仔細考慮了下,python剛好切合這個需求,隨便搜了下 Python 編寫 MapReduce程式,看了個教程,接下來就寫下這篇部落格做下記錄。
Hadoop 框架使用 Java 開發的,對 Java 進行了原生的支援,不過對於其它語言也提供了 API 支援,如 Python 、 C++ 、 Perl 、 Ruby 等。在看一篇大神寫的用指令碼語言編寫Hadoop基礎程式時,需要用到一個工具,那這個工具就是 Hadoop Streaming ,顧名思義, Streaming 就是 Pipe 操作 。
前置條件:
Python 環境
Hadoop 環境( single or cluster )
最容易的 Hadoop 程式設計模型就是 Mapper 和 Reducer 的編寫,這種程式設計模型大大降低了我們對於併發、同步、容錯、一致性的要求,你只要編寫好自己的業務邏輯,就可以提交任務。然後喝杯茶,結果就出來了,前提是你的業務邏輯沒有錯誤。
使用 Hadoop Streaming ,能夠利用 Pipe 模型,而使用 Python 的巧妙之處在於處理輸入輸出的資料使用的是 STDIN 和 STDOUT ,然後 Hadoop Streaming 會接管一切,轉化成 MapReduce 模型。
我們還是使用 wordcount 例子,具體內容不再詳細解釋。下面我們先看下 mapper 的程式碼:
- #!/usr/bin/env python
- import sys
- #input comes from STDIN (standard input)
- for line in sys.stdin:
- # remove leading and trailing whitespace
- line = line.strip()
- # split the line into words
- words = line.split()
-
# increase counters
- for word in words:
- # write the results to STDOUT (standard output);
- # what we output here will be the input for the
- # Reduce step, i.e. the input for reducer.py
- # tab-delimited; the trivial word count is 1
- print'%s\t%s' % (word, 1)
- </span>
簡單解釋一下,輸入從 sys.stdin 進入,然後進行分割操作,對於每行的分割結果,打印出 word 和 count=1 , Mapper 就這麼簡單。
大家看完 Mapper 之後,會產生疑問,這個怎麼能夠實現 mapper 功能?我們跳出這個 sys.stdin 模型,再回顧下 MapReduce 的程式。在 Mapper 中,程式不關心你怎麼輸入,只關心你的輸出,這個 Mapper 程式碼會被放到各個 slave 機器上,去執行 Mapper 過程,其實可以理解為過濾、處理。
在示例中,程式的輸入會被進行一系列的處理過程,得到 word 和 count ,這個就是 slave 機器上的資料處理之後的內容。仔細理解下這個過程,對於開發程式還是相當有幫助的。
下面我們來看下 Reduce 程式, wordcount 的 reduce程式就是統計相同 word 的 count 數目,然後再輸出。我們還是直接上程式碼吧:
- #!/usr/bin/env python
- from operator import itemgetter
- import sys
- current_word = None
- current_count = 0
- word = None
- # input comes from STDIN
- for line in sys.stdin:
- # remove leading and trailing whitespace
- line = line.strip()
- # parse the input we got from mapper.py
- word, count = line.split('\t', 1)
- # convert count (currently a string) to int
- try:
- count = int(count)
- except ValueError:
- # count was not a number, so silently
- # ignore/discard this line
- continue
- # this IF-switch only works because Hadoop sorts map output
- # by key (here: word) before it is passed to the reducer
- if current_word == word:
- current_count += count
- else:
- if current_word:
- # write result to STDOUT
- print'%s\t%s' % (current_word, current_count)
- current_count = count
- current_word = word
- # do not forget to output the last word if needed!
- if current_word == word:
- print'%s\t%s' % (current_word, current_count)
- </span>
看完這個reduce程式碼,執行一下,完全沒有問題,但是未必真正能理解這個reduce的內容,我來解釋一下,明確知道執行流程的可以跳過。
reduce的內碼表不復雜,利用Reduce程式,可以得出count數目。如果當前的詞和分出來的詞一致的話,count相加,如果不一致的話,就打印出來,同時更新輸入的word和count。最後的if是打印出最後一次統計結果。
reduce的執行依賴了MapReduce模型一個要點,在Shuffle過程中,同一個key會放到同一個reduce任務中,這樣處理的是一系列連續的相同的key值,當key不一樣的時候,就是說開始統計下一個word了。
下面測試下結果:
再執行reduer程式:
- echo "foo foo quux labs foo bar quux" | python ./mapper.py | sort -k1,1 | ./reducer.py
- 02. bar 1
- 03. foo 3
- 04. labs 1
- 05. quux 2
下面就是執行Hadoop命令了,在使用Hadoop Streaming時,要使用一定的格式操作才能提交任務。
- hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar –mapper mapperfile –file mapper_file_path –reducer reducefile –file reducer_file_path –input input_path –output output_path
將自己的mapper、reducer程式碼代入上面命令中,執行一下看結果是否正確。
最後,列一下Hadoop Streaming操作的引數,以作備忘。- Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming.jar [options]
- Options:
- -input <path> DFS input file(s) for the Map step
- -output <path> DFS output directory for the Reduce step
- -mapper <cmd|JavaClassName> The streaming command to run
- -combiner <JavaClassName> Combiner has to be a Java class
- -reducer <cmd|JavaClassName> The streaming command to run
- -file <file> File/dir to be shipped in the Job jar file
- -dfs <h:p>|local Optional. Override DFS configuration
- -jt <h:p>|local Optional. Override JobTracker configuration
- -additionalconfspec specfile Optional.
- -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
- -outputformat TextOutputFormat(default)|JavaClassName Optional.
- -partitioner JavaClassName Optional.
- -numReduceTasks <num> Optional.
- -inputreader <spec> Optional.
- -jobconf <n>=<v> Optional. Add or override a JobConf property
- -cmdenv <n>=<v> Optional. Pass env.var to streaming commands
- -cacheFile fileNameURI
- -cacheArchive fileNameURI
- -verbose
下面簡單解釋下引數的意思:
-input:DFS輸入,可以有多個input輸入,不過我一般喜歡把輸入用逗號{,}分割。
-output:DFS輸入,實際上就是Reducer輸出
-mapper:MapReduce中的Mapper,看清楚了,也可以是cmd shell命令
-combiner:這個必須是Java類
-reducer:MapReducer中的Reducer,也可以是shell命令
-file:這個file引數是用來提交本地的檔案,如本地的mapper或者reducer
-dfs:這個是可選的,用來覆蓋DFS設定。
-jt:用來覆蓋jobtracker的設定
-inputformat:輸入格式設定
-outputformat:輸出檔案的格式設定
上面的這些引數已經足夠平時的應用了,如果有更為細節的需求,就要考慮Streaming是否合適,是否適應自己的業務邏輯。
最後再說一句:按照Hadoop Streaming的執行流程,這些引數應該足夠了,但是如果我有更復雜的需求:如根據key值分離檔案;根據key值重新命名檔案;讀取HDFS上檔案配置資料;從多個數據源中讀取mapper資料,如HDFS、DataBase、HBase、Nosql等,這些比較靈活的應用使用Python Streaming都有限制,或者是我暫時還沒有看到這塊。但是目前來說,使用Hadoop Streaming操作能夠大量減少程式碼和流程,比使用Java要方便許多,特別是對於日常的、臨時的統計工作。