1. 程式人生 > >MapReduce中如何控制mapper的數量

MapReduce中如何控制mapper的數量

很多文件中描述,Mapper的數量在預設情況下不可直接控制干預,因為Mapper的數量由輸入的大小和個數決定。在預設情況下,最終input佔據了多少block,就應該啟動多少個Mapper。如果輸入的檔案數量巨大,但是每個檔案的size都小於HDFS的blockSize,那麼會造成啟動的Mapper等於檔案的數量(即每個檔案都佔據了一個block),那麼很可能造成啟動的Mapper數量超出限制而導致崩潰。這些邏輯確實是正確的,但都是在預設情況下的邏輯。其實如果進行一些客戶化的設定,就可以控制了。

        在Hadoop中,設定Map task的數量不像設定Reduce task數量那樣直接,即:不能夠通過API直接精確的告訴Hadoop應該啟動多少個Map task。

       你也許奇怪了,在API中不是提供了介面org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)嗎?這個值難道不可以設定Map task的數量嗎?這個API的確沒錯,在文件上解釋”Note: This is only a hint to the framework.“,即這個值對Hadoop的框架來說僅僅是個提示,不起決定性的作用。也就是說,即便你設定了,也不一定得到你想要的效果。

1. InputFormat介紹

在具體設定Map task數量之前,非常有必要了解一下與Map-Reduce輸入相關的基礎知識。

這個介面(org.apache.hadoop.mapred.InputFormat)描述了Map-Reduce job的輸入規格說明(input-specification),它將所有的輸入檔案分割成邏輯上的InputSplit,每一個InputSplit將會分給一個單獨的mapper;它還提供RecordReader的具體實現,這個Reader從邏輯的InputSplit上獲取input records並傳給Mapper處理。

InputFormat有多種具體實現,諸如FileInputFormat(處理基於檔案的輸入的基礎抽象類), DBInputFormat(處理基於資料庫的輸入,資料來自於一個能用SQL查詢的表),KeyValueTextInputFormat

(特殊的FineInputFormat,處理Plain Text File,檔案由回車或者回車換行符分割成行,每一行由key.value.separator.in.input.line分割成Key和Value),CompositeInputFormat,DelegatingInputFormat等。在絕大多數應用場景中都會使用FileInputFormat及其子型別。

通過以上的簡單介紹,我們知道InputFormat決定著InputSplit,每個InputSplit會分配給一個單獨的Mapper,因此InputFormat決定了具體的Map task數量

2. FileInputFormat中影響Map數量的因素

在日常使用中,FileInputFormat是最常用的InputFormat,它有很多具體的實現。以下分析的影響Map數量的因素僅對FileInputFormat及其子類有效,其他非FileInputFormat可以去檢視相應的 getSplits(JobConf job, int numSplits) 具體實現即可。

請看如下程式碼段(摘抄自org.apache.hadoop.mapred.FileInputFormat.getSplits,hadoop-0.20.205.0原始碼):

 

[java] view plaincopy

  1. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);  
  2. long minSize = Math.max(job.getLong("mapred.min.split.size", 1), minSplitSize);  
  3.   
  4. for (FileStatus file: files) {  
  5.   Path path = file.getPath();  
  6.   FileSystem fs = path.getFileSystem(job);  
  7.   if ((length != 0) && isSplitable(fs, path)) {   
  8.     long blockSize = file.getBlockSize();  
  9.     long splitSize = computeSplitSize(goalSize, minSize, blockSize);  
  10.       
  11.     long bytesRemaining = length;  
  12.     while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
  13.       String[] splitHosts = getSplitHosts(blkLocations,length-bytesRemaining, splitSize, clusterMap);  
  14.       splits.add(new FileSplit(path, length-bytesRemaining, splitSize, splitHosts));  
  15.       bytesRemaining -= splitSize;  
  16.     }  
  17.   
  18.     if (bytesRemaining != 0) {  
  19.       splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));  
  20.     }  
  21.   } else if (length != 0) {  
  22.     String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);  
  23.     splits.add(new FileSplit(path, 0, length, splitHosts));  
  24.   } else {   
  25.     //Create empty hosts array for zero length files  
  26.     splits.add(new FileSplit(path, 0, length, new String[0]));  
  27.   }  
  28. }  
  29.   
  30. return splits.toArray(new FileSplit[splits.size()]);  
  31.   
  32. protected long computeSplitSize(long goalSize, long minSize, long blockSize) {  
  33.     return Math.max(minSize, Math.min(goalSize, blockSize));  
  34. }  

totalSize:是整個Map-Reduce job所有輸入的總大小。

numSplits:來自job.getNumMapTasks(),即在job啟動時用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設定的值,給M-R框架的Map數量的提示。

goalSize:是輸入總大小與提示Map task數量的比值,即期望每個Mapper處理多少的資料,僅僅是期望,具體處理的資料數由下面的computeSplitSize決定。

minSplitSize:預設為1,可由子類複寫函式protected void setMinSplitSize(long minSplitSize) 重新設定。一般情況下,都為1,特殊情況除外

minSize:取的1和mapred.min.split.size中較大的一個。

blockSize:HDFS的塊大小,預設為64M,一般大的HDFS都設定成128M。

splitSize:就是最終每個Split的大小,那麼Map的數量基本上就是totalSize/splitSize。

接下來看看computeSplitSize的邏輯:首先在goalSize(期望每個Mapper處理的資料量)和HDFS的block size中取較小的,然後與mapred.min.split.size相比取較大的

3. 如何調整Map的數量

有了2的分析,下面調整Map的數量就很容易了。

3.1 減小Map-Reduce job 啟動時建立的Mapper數量

當處理大批量的大資料時,一種常見的情況是job啟動的mapper數量太多而超出了系統限制,導致Hadoop丟擲異常終止執行。解決這種異常的思路是減少mapper的數量。具體如下:

3.1.1 輸入檔案size巨大,但不是小檔案

這種情況可以通過增大每個mapper的input size,即增大minSize或者增大blockSize來減少所需的mapper的數量。增大blockSize通常不可行,因為當HDFS被hadoop namenode -format之後,blockSize就已經確定了(由格式化時dfs.block.size決定),如果要更改blockSize,需要重新格式化HDFS,這樣當然會丟失已有的資料。所以通常情況下只能通過增大minSize,即增大mapred.min.split.size的值。

 

3.1.2 輸入檔案數量巨大,且都是小檔案

所謂小檔案,就是單個檔案的size小於blockSize。這種情況通過增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat將多個input path合併成一個InputSplit送給mapper處理,從而減少mapper的數量。具體細節稍後會更新並展開。

 

3.2 增加Map-Reduce job 啟動時建立的Mapper數量

增加mapper的數量,可以通過減小每個mapper的輸入做到,即減小blockSize或者減小mapred.min.split.size的值。