1. 程式人生 > >Hadoop的MapReduce例項講解—Python寫的WordCount Demo

Hadoop的MapReduce例項講解—Python寫的WordCount Demo

    MapReduce是hadoop這隻大象的核心,Hadoop 中,資料處理核心就是 MapReduce 程式設計模型。一個Map/Reduce 作業(job) 通常會把輸入的資料集切分為若干獨立的資料塊,由 map任務(task)以完全並行的方式處理它們。框架會對map的輸出先進行排序, 然後把結果輸入給reduce任務。通常作業的輸入和輸出都會被儲存在檔案系統中。因此,我們的程式設計中心主要是 mapper階段和reducer階段。
    本文將通過 MapReduce中最為經典並簡單的WordCount例項來展示MapReduce工作原理。官網和各類部落格均有相關教程,內容大同小異,但在實際操作過程中還是會遇到一些未被提到的坑,最後發現能夠順利跑完這個MapReduce的簡單例項還是真的不容易的,特地將操作過程復現一遍,供大家參考。

1.前提(環境)
搭建好Hadoop的分散式叢集,並開啟Hadoop相關程序,已經啟動了必需的各項程序:namenode、datanode、resourcemanager、nodemanager、JobHistoryServer 等。並擁有Python2.7版本。Python3版本的話需要修改程式。
2.程式碼和資料集準備
1)編寫Map程式碼
這裡我們建立一個map.py指令碼,從標準輸入(stdin)讀取資料,預設以空格分隔單詞,然後按行輸出單詞機器詞頻到標準輸出(stdout),整個Map處理過程不會統計每個單詞出現的總次數,而是直接輸出“word 1”,以便作為Reduce的輸入進行統計。

import sys
    for line in sys.stdin:
        word_list = line.strip().split(' ')    
        for word in word_list:
            print '\t'.join([word.strip(), str(1)]) 

2)編寫Reduce程式碼
這裡我們建立一個reduce.py指令碼,從標準輸入(stdin)讀取mapper.py的結果,然後統計每個單詞出現的總次數並輸出到標準輸出(stdout)。

import sys   
    cur_word = None
    sum = 0    
    for line in sys.stdin:
        ss = line.strip().split('\t')     
        if len(ss) < 2:
            continue
        word = ss[0].strip()
        count = ss[1].strip()   
        if cur_word == None:
            cur_word = word
        if cur_word != word:
            print '\t'.join([cur_word, str(sum)])
            cur_word = word
            sum = 0     
        sum += int(count)    
    print '\t'.join([cur_word, str(sum)])
    sum = 0

3)寫資料集。隨便寫一個src.txt的測試資料。

3.在Hadoop本地除錯
將三個檔案放在Hadoop伺服器的一個喜歡的位置,比如放在新建的test資料夾下,並通過以下命令在本地測試。

$ cat src.txt | python map.py | sort -k 1 | python reduce.py

執行結果如下,成功地顯示了每個詞出現的次數。
在這裡插入圖片描述
4.在Hadoop叢集上執行
1)叢集上執行第一步就是把測試資料上傳到HDFS,我的上傳路徑是“/”,也就是當前使用者的根目錄。

$ hadoop fs -put src.txt /

檢視上傳結果如下
在這裡插入圖片描述2)接下來把我們的程式碼放到Hadoop上執行,由於Hadoop是java編寫的,執行時需要匯入程式的jar包才行,正常情況下,可以用eclipse自帶的打包工具打包寫好的java程式碼。然而,我們是用python編寫的,怎麼辦呢?這個時候就需要用到Hadoop自帶的工具streaming了,它可以自動幫我們將標準輸入輸出流串起來。且streaming.jar是Hadoop可執行的。首先cd到Hadoop-streaming-2.7.4.jar所在目錄。然後執行以下指令(注意其中的程式路徑需要根據自己情況修改):

$ hadoop jar hadoop-streaming-2.7.5.jar -input ./src.txt -output ./testout3 -mapper "python map.py" -reducer "python reduce.py" -file ~/test/map -file ~/test/reduce.py

在這裡插入圖片描述在這裡插入圖片描述在這裡插入圖片描述執行成功後通過 hdfs 的cat指令檢視輸出結果。
在這裡插入圖片描述