轉:用MapReduce進行資料密集型文字處理 – 本地聚合(上)
因為最近忙於Coursera提供 的一些課程,我已經有一段時間沒有寫部落格了。這些課程非常有意思,值得一看。我買了一本書《Data-Intensive Processing with MapReduce》,作者是Jimmy和Chris Dyer。書裡以偽碼形式總結了一些了MapReduce的重要演算法。。我打算用真正的hadoop程式碼來實現這本書中第3-6章中出現過的演算法,以Tom White的《Hadoop經典指南》作為參考。我假設本文的讀者已經瞭解Hadoop和MapReduce,所以本文不再詳述基礎概念。讓我們直接跳到第3章-MapReduce演算法設計,從本地聚合開始。
本地聚合(Local Aggregation)
從比較高的抽象層面上來講,mapper輸出資料的時候要先把中間結果寫到磁碟上,然後穿過網路傳給reducer處理。對於一個mapreduce job來說,將資料寫磁碟以及之後的網路傳輸的代價高昂,因為它們會大大增加延遲。所以,應該儘可能減少mapper產生的資料量,這樣才能加快job的處理速度。本地聚合就是這樣一種減少中間資料量提高job效率的技術。本地聚合並不能代替reducer,因為reducer可以聚集來自不同mapper的具有同樣key的資料。我們有三種本地聚合的方法:
1.使用Hadoop Combiner的功能
當然任何優化都要考慮一些其他因素,我們將在後面討論這些。
為了演示本地聚合,我在我的MacBookPro上用
Combiners
combiner功能由繼承了Reducer class的物件實現。事實上,在我們的例子裡,我們會重用word count中的reducer來作為combiner。combiner 在配置MapReduce job的時候指定,就像這樣:
- job.setReducerClass(TokenCountReducer.class);
下面是reducer的程式碼:
- publicclassTokenCountReducerextendsReducer<Text,IntWritable,Text,IntWritable>{
- @Override
- protectedvoid reduce(Text key,Iterable<IntWritable> values,Context context)
- throwsIOException,InterruptedException{
- int count =0;
- for(IntWritable value : values){
- count+= value.get();
- }
- context.write(key,newIntWritable(count));
- }
- }
combiner的作用就如它的名字,聚合資料以儘量減少shuffle階段的網路傳輸量。如前所述,reducer仍然需要把來自不同mapper的同樣的key聚集起來。因為combiner功能只是對過程的一個優化,所以Hadoop框架不能保證combiner會被呼叫多少次。(配置了combinere就一定會執行,但是執行1次還是n次是預先不確定的)
在Mapper聚合的方法1
不用combiner的話,替代方法之一隻需要對我們原來的word count mapper做一個小小的修改:
- publicclassPerDocumentMapperextendsMapper<LongWritable,Text,Text,IntWritable>{
- @Override
- protectedvoid map(LongWritable key,Text value,Context context)
- throwsIOException,InterruptedException{
- IntWritable writableCount =newIntWritable();
- Text text =newText();
- Map<String,Integer> tokenMap =newHashMap<String,Integer>();
- StringTokenizer tokenizer =newStringTokenizer(value.toString());
- while(tokenizer.hasMoreElements()){
- String token = tokenizer.nextToken();
- Integer count = tokenMap.get(token);
- if(count ==null) count =newInteger(0);
- count+=1;
- tokenMap.put(token,count);
- }
- Set<String> keys = tokenMap.keySet();
- for(String s : keys){
- text.set(s);
- writableCount.set(tokenMap.get(s));
- context.write(text,writableCount);
- }
- }
- }
如我們所看到的,輸出的詞的計數不再是1,我們用一個map記錄處理過的每個詞。處理完畢一行中的所有詞,然後遍歷這個map,輸出每個詞在一行中的出現次數。
在Mapper聚合的方法2
在mapper中聚合的第二種方法與上面的例子非常相似,但也有兩處不同 – 在什麼時候建立hashmap以及什麼時候輸出hashmap中的結果。在上面的例子裡,在每次呼叫map方法的時候建立map並在呼叫完成的時候輸出。在這個例子裡,我們會把map作為一個例項變數並在mapper的setUp方法裡初始化。同樣,map的內容要等到所有的map方法呼叫都完成之後,呼叫cleanUp方法的時候才輸出。
- publicclassAllDocumentMapperextendsMapper<LongWritable,Text,Text,IntWritable>{
- privateMap<String,Integer> tokenMap;
- @Override
- protectedvoid setup(Context context)throwsIOException,InterruptedException{
- tokenMap =newHashMap<String,Integer>();
- }
- @Override
- protectedvoid map(LongWritable key,Text value,Context context)
- throwsIOException,InterruptedException{
- StringTokenizer tokenizer =newStringTokenizer(value.toString());
- while(tokenizer.hasMoreElements()){
- String token = tokenizer.nextToken();
- Integer count = tokenMap.get(token);
- if(count ==null) count =newInteger(0);
- count+=1;
- tokenMap.put(token,count);
- }
- }
- @Override
- protectedvoid cleanup(Context context)throwsIOException,InterruptedException{
- IntWritable writableCount =newIntWritable();
- Text text =newText();
- Set<String> keys = tokenMap.keySet();
- for(String s : keys){
- text.set(s);
- writableCount.set(tokenMap.get(s));
- context.write(text,writableCount);
- }
- }
- }
正如上面的程式碼所示,在 mapper裡,跨越所有map方法呼叫,記錄每個詞的出現次數。通過這樣做,大大減少了傳送到reducer的記錄數量,能夠減少MapReduce任務的執行時間。達到的效果與使用MapReduce框架的combiner功能相同,但是這種情況下你要自己保證你的聚合程式碼是正確的。但是使用這種方法的時候要注意,在map方法呼叫過程中始終保持狀態是有問題的,這有悖於“map”功能的原義。而且,在map呼叫過程中保持狀態也需要關注你的記憶體使用。總之,根據不同情況來做權衡,選擇最合適的辦法。
結果
現在讓我們來看一下不同mapper的結果。因為job執行在偽分散式模式下,這個執行時間不足以參考,不過我們仍然可以推斷出使用了本地聚合之後是如何影響真實叢集上執行的MapReduce job的效率的。
每個詞輸出一次的Mapper:
- 12/09/1321:25:32 INFO mapred.JobClient:Reduce shuffle bytes=366010
- 12/09/1321:25:32 INFO mapred.JobClient:Reduce output records=7657
- 12/09/1321:25:32 INFO mapred.JobClient:SpilledRecords=63118
- 12/09/1321:25:32 INFO mapred.JobClient:Map output bytes=302886
在mapper中聚合方法1:
- 12/09/1321:28:15 INFO mapred.JobClient:Reduce shuffle bytes=354112
- 12/09/1321:28:15 INFO mapred.JobClient:Reduce output records=7657
- 12/09/1321:28:15 INFO mapred.JobClient:SpilledRecords=60704
- 12/09/1321:28:15 INFO mapred.JobClient:Map output bytes=293402
在mapper中聚合方法2:
- 12/09/1321:30:49 INFO mapred.JobClient:Reduce shuffle bytes=105885
- 12/09/1321:30:49 INFO mapred.JobClient:Reduce output records=7657
- 12/09/1321:30:49 INFO mapred.JobClient:SpilledRecords=15314
- 12/09/1321:30:49 INFO mapred.JobClient:Map output bytes=90565
使用了Combiner:
- 12/09/1321:22:18 INFO mapred.JobClient:Reduce shuffle bytes=105885
- 12/09/1321:22:18 INFO mapred.JobClient:Reduce output records=7657
- 12/09/1321:22:18 INFO mapred.JobClient:SpilledRecords=15314
- 12/09/1321:22:18 INFO mapred.JobClient:Map output bytes=302886
- 12/09/1321:22:18 INFO mapred.JobClient:Combine input records=31559
- 12/09/1321:22:18 INFO mapred.JobClient:Combine output records=7657
正如所料,沒有做任何聚合的Mapper效果最差,然後是“在mapper中聚合方法1”,差之了了。“在mapper中聚合方法2”與使用了combiner的結果很近似。比起前兩種方法,他們節省了2/3的shuffle位元組數。這等於減少了同樣數量的網路資料傳輸量,十分有利於提高MapReduce job的執行效率。不過要記住,方法2或者combiner並不一定能夠應用於所有的MapReduce jobs, word count很適合於這種場景,但是別的情況可不一定。
結論
正如你看到的,使用mapper裡聚合方法和combiner是有好處的,不過當你在尋求提升MapReduce jobs的效能的時候你應該多考慮一些因素。至於選哪種方法,這取決於你如何權衡。