1. 程式人生 > >Python編寫Hadoop MapReduce程式

Python編寫Hadoop MapReduce程式

adoop 的 MapReduce 程式,使用的是 Java ,但是使用 Java 很明顯的一個弊端就是每次都要編碼、打包、上傳、執行,還真心是麻煩,想要更加簡單的使用 Hadoop 的運算能力,想要寫 MapReduce程式不那麼複雜。還真是個問題。

仔細考慮了下,python剛好切合這個需求,隨便搜了下 Python 編寫 MapReduce程式,看了個教程,接下來就寫下這篇部落格做下記錄。

Hadoop 框架使用 Java 開發的,對 Java 進行了原生的支援,不過對於其它語言也提供了 API 支援,如 Python 、 C++ 、 Perl 、 Ruby 等。在看一篇大神寫的用指令碼語言編寫Hadoop基礎程式時,需要用到一個工具,那這個工具就是 Hadoop Streaming ,顧名思義, Streaming 就是 Pipe 操作 。

前置條件:

Python 環境

Hadoop 環境( single or cluster )

最容易的 Hadoop 程式設計模型就是 Mapper 和 Reducer 的編寫,這種程式設計模型大大降低了我們對於併發、同步、容錯、一致性的要求,你只要編寫好自己的業務邏輯,就可以提交任務。然後喝杯茶,結果就出來了,前提是你的業務邏輯沒有錯誤。

使用 Hadoop Streaming ,能夠利用 Pipe 模型,而使用 Python 的巧妙之處在於處理輸入輸出的資料使用的是 STDIN 和 STDOUT ,然後 Hadoop Streaming 會接管一切,轉化成 MapReduce 模型。

我們還是使用 wordcount 例子,具體內容不再詳細解釋。下面我們先看下 mapper 的程式碼:

  1. #!/usr/bin/env python
  2. import sys  
  3. #input comes from STDIN (standard input)
  4. for line in sys.stdin:  
  5.     # remove leading and trailing whitespace
  6.     line = line.strip()  
  7.     # split the line into words
  8.     words = line.split()  
  9.     # increase counters
  10.     for word in words:  
  11.         # write the results to STDOUT (standard output);
  12.         # what we output here will be the input for the
  13.         # Reduce step, i.e. the input for reducer.py
  14.         # tab-delimited; the trivial word count is 1
  15.         print'%s\t%s' % (word, 1)  
  16. </span>  

簡單解釋一下,輸入從 sys.stdin 進入,然後進行分割操作,對於每行的分割結果,打印出 word 和 count=1 , Mapper 就這麼簡單。

大家看完 Mapper 之後,會產生疑問,這個怎麼能夠實現 mapper 功能?我們跳出這個 sys.stdin 模型,再回顧下 MapReduce 的程式。在 Mapper 中,程式不關心你怎麼輸入,只關心你的輸出,這個 Mapper 程式碼會被放到各個 slave 機器上,去執行 Mapper 過程,其實可以理解為過濾、處理。

在示例中,程式的輸入會被進行一系列的處理過程,得到 word 和 count ,這個就是 slave 機器上的資料處理之後的內容。仔細理解下這個過程,對於開發程式還是相當有幫助的。

下面我們來看下 Reduce 程式, wordcount 的 reduce程式就是統計相同 word 的 count 數目,然後再輸出。我們還是直接上程式碼吧:

  1. #!/usr/bin/env python
  2. from operator import itemgetter  
  3. import sys  
  4. current_word = None
  5. current_count = 0
  6. word = None
  7. # input comes from STDIN
  8. for line in sys.stdin:  
  9.     # remove leading and trailing whitespace
  10.     line = line.strip()  
  11.     # parse the input we got from mapper.py
  12.     word, count = line.split('\t'1)  
  13.     # convert count (currently a string) to int
  14.     try:  
  15.         count = int(count)  
  16.     except ValueError:  
  17.         # count was not a number, so silently
  18.         # ignore/discard this line
  19.         continue
  20.     # this IF-switch only works because Hadoop sorts map output
  21.     # by key (here: word) before it is passed to the reducer
  22.     if current_word == word:  
  23.         current_count += count  
  24.     else:  
  25.         if current_word:  
  26.             # write result to STDOUT
  27.             print'%s\t%s' % (current_word, current_count)  
  28.         current_count = count  
  29.         current_word = word  
  30. # do not forget to output the last word if needed!
  31. if current_word == word:  
  32.     print'%s\t%s' % (current_word, current_count)  
  33. </span>  

看完這個reduce程式碼,執行一下,完全沒有問題,但是未必真正能理解這個reduce的內容,我來解釋一下,明確知道執行流程的可以跳過。

reduce的內碼表不復雜,利用Reduce程式,可以得出count數目。如果當前的詞和分出來的詞一致的話,count相加,如果不一致的話,就打印出來,同時更新輸入的word和count。最後的if是打印出最後一次統計結果。

reduce的執行依賴了MapReduce模型一個要點,在Shuffle過程中,同一個key會放到同一個reduce任務中,這樣處理的是一系列連續的相同的key值,當key不一樣的時候,就是說開始統計下一個word了。

下面測試下結果:

再執行reduer程式:

  1. echo "foo foo quux labs foo bar quux" | python ./mapper.py | sort -k1,1 | ./reducer.py    
  2. 02.  bar     1
  3. 03.  foo     3
  4. 04.  labs    1
  5. 05.  quux    2

下面就是執行Hadoop命令了,在使用Hadoop Streaming時,要使用一定的格式操作才能提交任務。

  1. hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar –mapper mapperfile –file mapper_file_path –reducer reducefile –file reducer_file_path –input input_path –output output_path  

將自己的mapper、reducer程式碼代入上面命令中,執行一下看結果是否正確。

最後,列一下Hadoop Streaming操作的引數,以作備忘。
  1. Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming.jar [options]  
  2.  Options:  
  3.    -input    <path>                   DFS input file(s) for the Map step  
  4.    -output   <path>                   DFS output directory for the Reduce step  
  5.    -mapper   <cmd|JavaClassName>      The streaming command to run  
  6.    -combiner <JavaClassName>          Combiner has to be a Java class  
  7.    -reducer  <cmd|JavaClassName>      The streaming command to run  
  8.    -file     <file>                   File/dir to be shipped in the Job jar file  
  9.    -dfs    <h:p>|local                Optional. Override DFS configuration  
  10.    -jt     <h:p>|local                Optional. Override JobTracker configuration  
  11.    -additionalconfspec specfile       Optional.  
  12.    -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.  
  13.    -outputformat TextOutputFormat(default)|JavaClassName  Optional.  
  14.    -partitioner JavaClassName         Optional.  
  15.    -numReduceTasks <num>              Optional.  
  16.    -inputreader <spec>                Optional.  
  17.    -jobconf  <n>=<v>                  Optional. Add or override a JobConf property  
  18.    -cmdenv   <n>=<v>                  Optional. Pass env.var to streaming commands  
  19.    -cacheFile fileNameURI  
  20.    -cacheArchive fileNameURI  
  21.    -verbose  

下面簡單解釋下引數的意思:

-input:DFS輸入,可以有多個input輸入,不過我一般喜歡把輸入用逗號{,}分割。

-output:DFS輸入,實際上就是Reducer輸出

-mapper:MapReduce中的Mapper,看清楚了,也可以是cmd shell命令

-combiner:這個必須是Java類

-reducer:MapReducer中的Reducer,也可以是shell命令

-file:這個file引數是用來提交本地的檔案,如本地的mapper或者reducer

-dfs:這個是可選的,用來覆蓋DFS設定。

-jt:用來覆蓋jobtracker的設定

-inputformat:輸入格式設定

-outputformat:輸出檔案的格式設定

上面的這些引數已經足夠平時的應用了,如果有更為細節的需求,就要考慮Streaming是否合適,是否適應自己的業務邏輯。

最後再說一句:按照Hadoop Streaming的執行流程,這些引數應該足夠了,但是如果我有更復雜的需求:如根據key值分離檔案;根據key值重新命名檔案;讀取HDFS上檔案配置資料;從多個數據源中讀取mapper資料,如HDFS、DataBase、HBase、Nosql等,這些比較靈活的應用使用Python Streaming都有限制,或者是我暫時還沒有看到這塊。但是目前來說,使用Hadoop Streaming操作能夠大量減少程式碼和流程,比使用Java要方便許多,特別是對於日常的、臨時的統計工作。