MapReduce理解-深入理解MapReduce
前面的幾篇部落格主要介紹了Hadoop的儲存HDFS,接下來幾篇部落格主要介紹Hadoop的計算框架MapReduce。本片部落格主要講解MapReduce框架的具體執行流程,以及shuffle過程,當然這方面的技術部落格已經特別多而且都寫得很優秀,我寫本篇部落格之前也有過相關閱讀,受益匪淺。對一些部落格和資料的參考都會才部落格下方參考資料中列出。
MapReduce理解
- MapRedeuce,我們可以把它分開來理解:
-
對映(Mapping) :對集合裡的每個目標應用同一個操作。即,如果你想把表單裡每個單元格乘以二,那麼把這個函式單獨地應用在每個單元格上的操作就屬於mapping(這裡體現了移動計算
- 化簡(Reducing):遍歷集合中的元素來返回一個綜合的結果。即,輸出表單裡一列數字的和這個任務屬於reducing。
-
把需要計算的東西放入到MapReduce中進行計算,然後返回一個我們期望的結果。所以首先我們需要一個來源(需要計算的東西)即輸入(input),然後MapReduce操作這個輸入(input),通過定義好的計算模型,最後得到一個(期望的結果)輸出(output)。
-
計算模型
在這裡我們主要討論的是MapReduce計算模型:在執行一個mapreduce計算任務時候,任務過程被分為兩個階段:map階段和reduce階段,每個階段都是用鍵值對(key/value)作為輸入(input)和輸出(output)。而程式設計師要做的就是定義好這兩個階段的函式:map函式和reduce函式。
例項程式碼
- 以MapReduce統計單詞次數為例(虛擬碼),主要四個模組來講解,如上圖計算框架:
-
Input,資料讀入
1 2 3 4 5 6
// 設定資料輸入來源 FileInputFormat.setInputPaths(job, args[0]); FileInputFormat.setInputDirRecursive(job, true); //遞迴 job.setInputFormatClass(TextInputFormat.class); //設定輸入格式 //TextInputFormat,一種預設的文字輸入格式,Mapper一次讀取文字中的一行資料。
-
使用Mapper計算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
//設定Job的Mapper計算類和K2、V2型別 job.setMapperClass(WordCountMapper.class); //1.設定Mapper類 job.setMapOutputKeyClass(Text.class); //設定Mapper輸出Key的型別 job.setMapOutputValueClass(LongWritable.class);//設定Mapper輸出Value的型別 //WordCountMapper類 /** * 自定義的Map 需要繼承Mapper * K1 : 行序號 * V1 : 行資訊 * K2 : 單詞 * V2 : 次數 */ public static class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> { Text k2 = new Text() ; LongWritable v2 = new LongWritable(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1. 獲取行資訊 String line = value.toString(); //2. 獲取行的所用單詞 String[] words = line.split("\t");//這裡假設一行文字單詞分隔符為"\t" for (String word : words) { k2.set(word.getBytes()) ; //設定鍵 v2.set(1); //設定值 context.write(k2,v2); } } }
-
使用Reducer合併計算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
//設定Job的Reducer計算類和K3、V3型別 job.setReducerClass(WordCountReducer.class); //自定義的Reducer類 job.setOutputKeyClass(Text.class); //輸出Key型別 job.setOutputValueClass(LongWritable.class); //輸出Value型別 //WordCountReducer 類 /** * 自定義的Reduce 需要繼承Reducer * K2 : 字串 * V3 : 次數(分組) * K3 : 字串 * V3 : 次數(統計總的) */ public static class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ LongWritable v3 = new LongWritable() ; int sum = 0 ; protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { sum = 0 ; for (LongWritable value : values) { sum +=value.get() ; } v3.set(sum); context.write( key , v3 ); } }
-
Output,資料寫出
1
FileOutputFormat.setOutputPath(job, new Path(args[1]));
執行機制
- 下面從兩個角度來講解MapReduce的執行機制:
各個角色實體
-
程式執行時過程設計到的一個角色實體
1.1. Client:編寫mapreduce程式,配置作業,提交作業的客戶端 ;
1.2. ResourceManager:叢集中的資源分配管理 ;
1.3. NodeManager:啟動和監管各自節點上的計算資源 ;
1.4. ApplicationMaster:每個程式對應一個AM,負責程式的任務排程,本身也是執行在NM的Container中 ;
1.5. HDFS:分散式檔案系統,儲存作業的資料、配置資訊等等。 -
客戶端提交Job
2.1. 客戶端編寫好Job後,呼叫Job例項的Submit()或者waitForCompletion()方法提交作業;
2.2. 客戶端向ResourceManager請求分配一個Application ID,客戶端會對程式的輸出、輸入路徑進行檢查,如果沒有問題,進行作業輸入分片的計算。 -
Job提交到ResourceManager
3.1. 將作業執行所需要的資源拷貝到HDFS中(jar包、配置檔案和計算出來的輸入分片資訊等);
3.2. 呼叫ResourceManager的submitApplication方法將作業提交到ResourceManager。 -
給作業分配ApplicationMaster
4.1. ResourceManager收到submitApplication方法的呼叫之後會命令一個NodeManager啟動一個Container ;
4.2. 在該NodeManager的Container上啟動管理該作業的ApplicationMaster程序。 -
ApplicationMaster初始化作業
5.1. ApplicationMaster對作業進行初始化操作;
5.2. ApplicationMaster從HDFS中獲得輸入分片資訊(map、reduce任務數) -
任務分配
6.1. ApplicationMaster為其每個map和reduce任務向RM請求計算資源;
6.2. map任務優先於reduce任,map資料優先考慮本地化的資料。 -
任務執行,在 Container 上啟動任務(通過YarnChild程序來執行),執行map/reduce任務。
時間先後順序
-
輸入分片(input split)
每個輸入分片會讓一個map任務來處理,預設情況下,以HDFS的一個塊的大小(預設為128M,可以設定)為一個分片。map輸出的結果會暫且放在一個環形記憶體緩衝區中(預設mapreduce.task.io.sort.mb=100M
),當該緩衝區快要溢位時(預設mapreduce.map.sort.spill.percent=0.8
),會在本地檔案系統中建立一個溢位檔案,將該緩衝區中的資料寫入這個檔案; -
map階段:由我們自己編寫,最後呼叫 context.write(…);
-
partition分割槽階段
3.1. 在map中呼叫 context.write(k2,v2)方法輸出,該方法會立刻呼叫 Partitioner類對資料進行分割槽,一個分割槽對應一個 reduce task。
3.2. 預設的分割槽實現類是 HashPartitioner ,根據k2的雜湊值 % numReduceTasks
,可能出現“資料傾斜”現象。
3.3. 可以自定義 partition ,呼叫 job.setPartitioner(…)自己定義分割槽函式。 -
combiner合併階段:將屬於同一個reduce處理的輸出結果進行合併操作
4.1. 是可選的;
4.2. 目的有三個:1.減少Key-Value對;2.減少網路傳輸;3.減少Reduce的處理。 -
shuffle階段:即Map和Reduce中間的這個過程
5.1. 首先 map 在做輸出時候會在記憶體裡開啟一個環形記憶體緩衝區,專門用來做輸出,同時map還會啟動一個守護執行緒;
5.2. 如緩衝區的記憶體達到了閾值的80%,守護執行緒就會把內容寫到磁碟上,這個過程叫spill,另外的20%記憶體可以繼續寫入要寫進磁碟的資料;
5.3. 寫入磁碟和寫入記憶體操作是互不干擾的,如果快取區被撐滿了,那麼map就會阻塞寫入記憶體的操作,讓寫入磁碟操作完成後再繼續執行寫入記憶體操作;
5.4. 寫入磁碟時會有個排序操作,如果定義了combiner函式,那麼排序前還會執行combiner操作;
5.5. 每次spill操作也就是寫入磁碟操作時候就會寫一個溢位檔案,也就是說在做map輸出有幾次spill就會產生多少個溢位檔案,等map輸出全部做完後,map會合並這些輸出檔案,這個過程裡還會有一個Partitioner操作(如上)
5.6. 最後 reduce 就是合併map輸出檔案,Partitioner會找到對應的map輸出檔案,然後進行復制操作,複製操作時reduce會開啟幾個複製執行緒,這些執行緒默認個數是5個(可修改),這個複製過程和map寫入磁碟過程類似,也有閾值和記憶體大小,閾值一樣可以在配置檔案裡配置,而記憶體大小是直接使用reduce的tasktracker的記憶體大小,複製時候reduce還會進行排序操作和合並檔案操作,這些操作完了就會進行reduce計算了。 -
reduce階段:由我們自己編寫,最終結果儲存在hdfs上的。