mapreduce 作業中 map/reduce 個數的計算
1. 問題描述
當在hadoop叢集提交mapreduce作業時,map 和 reduce 的個數是如何計算的?
2. map個數的計算
2.1 map個數的計算和分片大小(splitSize)是有關係的,所以我們先看看splitSize的計算公式:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
2.2 計算公式引數說明及原始碼檢視:
minSize: Math.max(1, mapreduce.input.fileinputformat.split.minsize配置的大小 預設是1)
maxSize: mapreduce.input.fileinputformat.split.maxsize 配置的大小,預設為Long.MAX_VALUE
blockSize: hdfs快的大小,看叢集配置 e.g. 128M
當我們計算出splitSize後,遍歷 FileInputFormat.addInputPath 配置路徑中的每個檔案,如果檔案/ splitSize > 1.1 ,則取splitSize 作為一個map輸入分片,剩下的檔案大小 繼續判斷 檔案/ splitSize > 1.1 ,直到 <= 1.1 ;如果檔案/ splitSize <= 1.1 則 直接將該檔案作為一個map輸入分片 。 文字表述有點饒,直接上原始碼(FileInputFormat類):
程式碼很簡單,我大致列了8 個步驟,下面對8個步驟進行說明:
步驟1:計算minSize,見上面引數說明
步驟2: 計算maxSize ,見上面引數說明
步驟3: 遍歷FileInputFormat.addInputPath路徑中的檔案,即我們配置的資料來源
步驟4: 判斷檔案是否是 可拆分的檔案,如果檔案不可拆分,則上述的公式不可用,不可拆分的檔案包括壓縮檔案等
步驟5: 計算splitSize 大小,見上面 splitSize 計算公式
步驟6: while(剩餘檔案/ splitSize > 1.1) 備註:SPLIT_SLOP=1.1 ,如果成立,則splitSize 作為一個map的輸入,剩餘檔案大小:bytesRemaining -= splitSize ;
步驟7: 跳出步驟6中的while迴圈後,如果剩餘檔案不為0,則剩下的檔案單獨作為一個map的輸入
步驟8: 如果檔案是不可分的,則整個檔案都當作一個map的輸入
3. reduce個數的計算
3.1 決定reduce個數的引數配置
reduce的個數,可以直接在client 程式碼中設定 e.g. job.setNumReduceTasks(100) ; 即reduce的個數設定為100個,如果我們沒有手動配置了,該如何計算呢?
檢視原始碼 job.setNumReduceTasks(100),即配置 JobContext.NUM_REDUCES("mapreduce.job.reduce") 屬性為引數n,原始碼:public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 即reduce的個數等於mapreduce.job.reduce 屬性value。該屬性的預設值為1,如果想要配置全域性值,可以在mapred-site.xml 檔案中配置。
4. 案例分析
4.1 輸入源( FileInputFormat.addInputPath)是按天 和 小時 分割槽的,格式是orc檔案(可拆分),一個小時拆分成2個檔案,每個檔案都是1M~20M 不等,mapreduce.input.fileinputformat.split.minsize 和mapreduce.input.fileinputformat.split.maxsize 都沒有配置,hdfs block size 為128M, 每次任務僅僅跑一天的資料。
根據上述公式 計算可得:
minSize =1;
maxSize = Long.MAX.VALUE;
blockSize = 134,217,728 (128M);
那splitSize = 128M;
因為一天的檔案有48個,因為每個檔案1M~20M,即每個檔案用一個map處理,即有48個map
4.2 mapreduce.job.reduce 在mapred-site.xml 檔案中配置為112 ,如下:
且client 程式碼中沒有手動設定setNumReduceTasks 數量,即reduce個數應該是112。 分析下後,檢視job 的詳細資訊截圖如下:
和上述分析的一致