1. 程式人生 > >MapReduce程式設計基礎

MapReduce程式設計基礎

1. WordCount示例及MapReduce程式框架

2.  MapReduce程式執行流程

3.  深入學習MapReduce程式設計(1)

4. 參考資料及程式碼下載 

<1>. WordCount示例及MapReduce程式框架

首先通過一個簡單的程式來實際執行一個MapReduce程式,然後通過這個程式我們來哦那個結一下MapReduce程式設計模型。

下載源程式:/Files/xuqiang/WordCount.rar,將該程式打包成wordcount.jar下面的命令,隨便寫一個文字檔案,這裡是WordCountMrtrial,並上傳到hdfs上,這裡的路徑是/tmp/WordCountMrtrial,執行下面的命令:

 [email protected]:~/hadoop/src/hadoop-0.21.0$ ./bin/hadoop jar wordcount.jar WordCount /tmp/WordCountMrtrial /tmp/result

 如果該任務執行完成之後,將在hdfs的/tmp/result目錄下生成類似於這樣的結果:

 gentleman11

get12 give8 go6 good9 government16

執行一個程式的基本上就是這樣一個過程,我們來看看具體程式:

main函式中首先生成一個Job物件, Job job = new Job(conf, "word count");然後設定job的MapperClass,ReducerClass,設定輸入檔案路徑FileInputFormat.addInputPath(job, new Path(otherArgs[0]));設定輸出檔案路徑:FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));等待程式執行完成:System.exit(job.waitForCompletion(true) ? 0 : 1);可以看出main中僅僅是啟動了一個job,然後設定該job相關的引數,具體實現MapReduce是mapper類和reducer類。

 TokenizerMapper類中map函式將一行分割成<K2, V2>,然後IntSumReducer的reduce將<K2, list<V2>>轉換成最終結果<K3, V3>。

 

通過這個示例基本上也能總結出簡單的MapReduce程式設計的模型:一個Mapper類,一個Reducer類,一個Driver類。

 

<2>. MapReduce程式執行流程

 這裡所描述的執行流程更加註重是從程式的角度去理解,更加全面的流程可參考[這裡]。

 

首先使用者指定待處理的檔案,在WordCount就是檔案WordCountMrtrial,這是hadoop根據設定的InputDataFormat來將輸入檔案分割成一個record(key/value對),然後將這些record傳遞給map函式,在WordCount示例中,對應的record就是<line_number行號, line_content該行內容>;

然後map函式根據輸入的record,形成<K2, V2>,在WordCount示例中形成<K2, V2>就是<single_word, word_count>,例如<"a", 1>;

如果map過程完成之後,hadoop將這些生成的<K2, V2>按照K2進行分組,形成<K2,list(V2) >,之後傳遞給reduce函式,在該函式中最終得到程式的輸出結果<K3, V3>。

<3>. 深入學習MapReduce程式設計(1)

3.1 hadoop data types

由於在hadoop需要將key/value對序列化,然後通過網路network傳送到叢集中的其他機器上,所以說hadoop中的型別需要能夠序列化。

具體而言,自定義的型別,如果一個類class實現了Writable interface的話,那麼這個可以作為value型別,如果一個class實現了WritableComparable<T> interface的話,那麼這個class可以作為value型別或者是key型別。 

hadoop本身已經實現了一些預定義的型別predefined classes,並且這些型別實現了WritableComparable<T>介面。

3.2 Mapper

如果一個類想要成為一個mapper,那麼該類需要實現Mapper介面,同時繼承自MapReduceBase。在MapReduceBase類中,兩個方法是特別需要注意的:

void configure( JobConf job):這個方法是在任務被執行之前呼叫 

void close():在任務執行完成之後呼叫

剩下的工作就是編寫map方法,原型如下:

void map(Object key, Text value, Context context

                    ) throws IOException, InterruptedException;

 這個方法根據<K1, V1>生成<K2, V2>,然後通過context輸出。

同樣的在hadoop中預先定義瞭如下的Mapper:

 

3.3 Reducer

如果一個類想要成為Reducer的話,需要首先實現Reducer介面,然後需要繼承自MapReduceBase。

當reducer接收從mapper傳遞而來的key/value對,然後根據key來排序,分組,最終生成<K2, list<V2>> ,然後reducer根據<K2, list<V2>>生成<K3, V3>.

同樣在hadoop中預定義了一些Reducer:

 

3.4 Partitioner

 Partitioner的作用主要是將mapper執行的結果“導向directing”到reducer。如果一個類想要成為Partitioner,那麼需要實現Partitioner介面,該介面繼承自JobConfigurable,定義如下:

publicinterface Partitioner<K2, V2>extends JobConfigurable {
  
/** 
   * Get the paritition number for a given key (hence record) given the total 
   * number of partitions i.e. number of reduce-tasks for the job.
   *   
   * <p>Typically a hash function on a all or a subset of the key.</p>
   *
   * 
@param key the key to be paritioned.
   * 
@param value the entry value.
   * 
@param numPartitions the total number of partitions.
   * 
@return the partition number for the <code>key</code>.
   
*/int getPartition(K2 key, V2 value, int numPartitions);

hadoop將根據方法getPartition的返回值確定將mapper的值傳送到那個reducer上。返回值相同的key/value對將被“導向“至同一個reducer。

3.5 Input Data Format and Output Data Format

3.5.1 Input Data Format 

上面我們的假設是MapReduce程式的輸入是key/value對,也就是<K1, V1>,但是實際上一般情況下MapReduce程式的輸入是big file的形式,那麼如何將這個檔案轉換成<K1, V1>,即file -> <K1, V1>。這就需要使用InputFormat介面了。 

下面是幾個常用InputFormat的實現類:

 

當然除了使用hadoop預先定義的InputDataFormat之外,還可以自定義,這是需要實現InputFormat介面。該介面僅僅包含兩個方法:

 InputSplit[] getSplits(JobConf job, int numSplits) throws  IOException;該介面實現將大檔案分割成小塊split。   RecordReader<K, V> getRecordReader(InputSplit split,                                      JobConf job, 

                                     Reporter reporter) throws IOException; 

該方法輸入分割成的split,然後返回RecordReader,通過RecordReader來遍歷該split內的record。 

3.5.2 Output Data Format

每個reducer將自己的輸出寫入到結果檔案中,這是使用output data format來配置輸出的檔案的格式。hadoop預先實現了:

 

3.6 Streaming in Hadoop

3.6.1 執行流程

我們知道在linux中存在所謂的“流”的概念,也就是說我們可以使用下面的命令:

cat input.txt | RandomSample.py 10 >sampled_output.txt 

同樣在hadoop中我們也可以使用類似的命令,顯然這樣能夠在很大程度上加快程式的開發程序。下面來看看hadoop中流執行的過程:

 

hadoop streaming從標磚輸入STDIN讀取資料,預設的情況下使用\t來分割每行,如果不存在\t的話,那麼這時正行的內容將被看作是key,而此時的value內容為空;

然後呼叫mapper程式,輸出<K2, V2>;

之後,呼叫Partitioner來將<K2, V2>輸出到對應的reducer上;

reducer根據輸入的<K2, list(V2)> 得到最終結果<K3, V3>並輸出到STDOUT上。 

3.6.2 簡單示例程式 

下面我們假設需要做這樣一個工作,輸入一個檔案,檔案中每行是一個數字,然後得到該檔案中的數字的最大值(當然這裡可以使用streaming中自帶的Aggregate)。 首先我們編寫一個python檔案(如果對python不是很熟悉,看看[這裡]):

3.6.2.1 準備資料

 [email protected]:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >url1

 [email protected]:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >url2

上傳到hdfs上:

 [email protected]:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -mkdir urls

[email protected]:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url1 urls/ [email protected]:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url2 urls/

3.6.2.2 編寫mapper multifetch.py

#!/usr/bin/env pythonimport sys, urllib, re

title_re 
= re.compile("<title>(.*?)</title>",
        re.MULTILINE 
| re.DOTALL | re.IGNORECASE)

for line in sys.stdin:
    
# We assume that we are fed a series of URLs, one per line    url = line.strip()
    
# Fetch the content and output the title (pairs are tab-delimited)    match = title_re.search(urllib.urlopen(url).read())
    
if match:
        
print url, "\t", match.group(1).strip()

該檔案的主要作用是給定一個url,然後輸出該url代表的html頁面的title部分。

在本地測試一下該程式:

[email protected]:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >urls [email protected]:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >>urls

[email protected]:~/hadoop/src/hadoop-0.21.0$ sudo chmod u+x ./multifetch.py 

 [email protected]:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py 將輸出:

 http://www.cs.brandeis.edu Computer Science Department | Brandeis University

http://www.nytimes.com The New York Times - Breaking News, World News &amp; Multimedia

3.6.2.3 編寫reducer reducer.py

編寫reducer.py檔案 

 #!/usr/bin/env python from operator import itemgetter
import sys

for line in sys.stdin:
    line 
= line.strip()
    
print line

[email protected]:~/hadoop/src/hadoop-0.21.0$ chmod u+x ./reducer.py  

現在我們的mapper和reducer已經準備好了,那麼首先在本地上執行測試一下程式的功能,下面的命令模擬在hadoop上執行的過程:

首先mapper從stdin讀取資料,這裡是一行;

然後讀取該行的內容作為一個url,然後得到該url代表的html的title的內容,輸出<url, url-title-content>;

呼叫sort命令將mapper輸出排序;

將排序完成的結果交給reducer,這裡的reducer僅僅是將結果輸出。 

[email protected]:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py | sort | ./reducer.py 
http:
//www.cs.brandeis.edu     Computer Science Department | Brandeis University
http://www.nytimes.com     The New York Times - Breaking News, World News &amp; Multimedia 

顯然程式能夠正確 

3.6.2.4 在hadoop streaming上執行

[email protected]:~/hadoop/src/hadoop-0.21.0$ bin/hadoop jar ./mapred/contrib/streaming/hadoop-0.21.0-streaming.jar \
>-mapper /home/xuqiang/hadoop/src/hadoop-0.21.0/multifetch.py \
>-reducer /home/xuqiang/hadoop/src/hadoop-0.21.0/reducer.py \
>-input urls/* \
> -output titles 

 程式執行完成之後,檢視執行結果:

 [email protected]:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -cat titles/part-00000

<4>. 參考資料及程式碼下載

<Hadoop In Action>