hadoop 分片與分塊,map task和reduce task的理解
分塊:Block
HDFS儲存系統中,引入了檔案系統的分塊概念(block),塊是儲存的最小單位,HDFS定義其大小為64MB。與單磁碟檔案系統相似,儲存在 HDFS上的檔案均儲存為多個塊,不同的是,如果某檔案大小沒有到達64MB,該檔案也不會佔據整個塊空間。在分散式的HDFS叢集上,Hadoop系統保證一個塊儲存在一個datanode上。
把File劃分成Block,這個是物理上真真實實的進行了劃分,資料檔案上傳到HDFS裡的時候,需要劃分成一塊一塊,每塊的大小由hadoop-default.xml裡配置選項進行劃分。一個大檔案可以把劃分後的所有塊儲存到同一個磁碟上,也可以在每個磁碟上都存在這個檔案的分塊。
這個就是預設的每個塊64M:
<property> <name>dfs.block.size</name> <value>67108864</value> <description>The default block size for new files.</description> </property>
資料劃分的時候有冗餘,即進行備份,個數是由以下配置指定的。具體的物理劃分步驟由Namenode決定。
1 <property> 2 <name>dfs.replication</name> 3 <value>3</value> 4 <description>Default block replication. 5 The actual number of replications can be specified when the file is created. 6 The default is used if replication is not specified in create time. 7 </description>8 </property>
分片:splits
由InputFormat這個介面來定義的,其中有個getSplits方法。這裡有一個新的概念:fileSplit。每個map處理一個fileSplit,所以有多少個fileSplit就有多少個map(map數並不是單純的由使用者設定決定的)。
我們來看一下hadoop分配splits的原始碼:
1 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 2 long minSize = Math.max(job.getLong("mapred.min.split.size", 1), minSplitSize); 3 4 for (FileStatus file: files) { 5 Path path = file.getPath(); 6 FileSystem fs = path.getFileSystem(job); 7 if ((length != 0) && isSplitable(fs, path)) { 8 long blockSize = file.getBlockSize(); 9 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 10 11 long bytesRemaining = length; 12 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 13 String[] splitHosts = getSplitHosts(blkLocations,length-bytesRemaining, splitSize, clusterMap); 14 splits.add(new FileSplit(path, length-bytesRemaining, splitSize, splitHosts)); 15 bytesRemaining -= splitSize; 16 } 17 18 if (bytesRemaining != 0) { 19 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); 20 } 21 } else if (length != 0) { 22 String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); 23 splits.add(new FileSplit(path, 0, length, splitHosts)); 24 } else { 25 //Create empty hosts array for zero length files 26 splits.add(new FileSplit(path, 0, length, new String[0])); 27 } 28 } 29 30 return splits.toArray(new FileSplit[splits.size()]); 31 32 protected long computeSplitSize(long goalSize, long minSize, long blockSize) { 33 return Math.max(minSize, Math.min(goalSize, blockSize)); 34 }
totalSize:是整個Map-Reduce job所有輸入的總大小。
numSplits:來自job.getNumMapTasks(),即在job啟動時用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設定的值,給M-R框架的Map數量的提示。
goalSize:是輸入總大小與提示Map task數量的比值,即期望每個Mapper處理多少的資料,僅僅是期望,具體處理的資料數由下面的computeSplitSize決定。
minSplitSize:預設為1,可由子類複寫函式protected void setMinSplitSize(long minSplitSize) 重新設定。一般情況下,都為1,特殊情況除外。
minSize:取的1和mapred.min.split.size中較大的一個。
blockSize:HDFS的塊大小,預設為64M,一般大的HDFS都設定成128M。
splitSize:就是最終每個Split的大小,那麼Map的數量基本上就是totalSize/splitSize。
接下來看看computeSplitSize的邏輯:首先在goalSize(期望每個Mapper處理的資料量)和HDFS的block size中取較小的,然後與mapred.min.split.size相比取較大的。
一個片為一個splits,即一個map,只要搞清楚片的大小,就能計算出執行時的map數。而一個split的大小是由goalSize, minSize, blockSize這三個值決定的。computeSplitSize的邏輯是,先從goalSize和blockSize兩個值中選出最小的那個(比如一般不設定map數,這時blockSize為當前檔案的塊size,而goalSize是檔案大小除以使用者設定的map數得到的,如果沒設定的話,預設是1),在預設的大多數情況下,blockSize比較小。然後再取blockSize和minSize中最大的那個。而minSize如果不通過”mapred.min.split.size”設定的話(”mapred.min.split.size”預設為0),minSize為1,這樣得出的一個splits的size就是blockSize,即一個塊一個map,有多少塊就有多少map。
input_file_num : 輸入檔案的個數 (1)預設map個數 如果不進行任何設定,預設的map個數是和blcok_size相關的。 default_num = total_size / block_size; (2)期望大小 可以通過引數 mapred.map.tasks來設定程式設計師期望的map個數,但是這個個數只有在大於default_num的時候,才會生效。 goal_num =mapred.map.tasks; (3)設定處理的檔案大小 可以通過mapred.min.split.size 設定每個task處理的檔案大小,但是這個大小隻有在大於 block_size的時候才會生效。 split_size = max( mapred.min.split.size, block_size);split_num = total_size / split_size; (4)計算的map個數 compute_map_num = min(split_num, max(default_num, goal_num)) 除了這些配置以外,mapreduce還要遵循一些原則。 mapreduce的每一個map處理的資料是不能跨越檔案的,也就是說max_map_num <= input_file_num。 所以,最終的map個數應該為: final_map_num = min(compute_map_num, input_file_num) 經過以上的分析,在設定map個數的時候,可以簡單的總結為以下幾點: (1)如果想增加map個數,則設定mapred.map.tasks 為一個較大的值。 (2)如果想減小map個數,則設定mapred.min.split.size 為一個較大的值。
map task
如何調整map數量:
有了2的分析,下面調整Map的數量就很容易了。
減小Map-Reduce job 啟動時建立的Mapper數量
當處理大批量的大資料時,一種常見的情況是job啟動的mapper數量太多而超出了系統限制,導致Hadoop丟擲異常終止執行。解決這種異常的思路是減少mapper的數量。具體如下:
輸入檔案size巨大,但不是小檔案
這種情況可以通過增大每個mapper的input size,即增大minSize或者增大blockSize來減少所需的mapper的數量。增大blockSize通常不可行,因為當HDFS被hadoop namenode -format之後,blockSize就已經確定了(由格式化時dfs.block.size決定),如果要更改blockSize,需要重新格式化HDFS,這樣當然會丟失已有的資料。所以通常情況下只能通過增大minSize,即增大mapred.min.split.size的值。
輸入檔案數量巨大,且都是小檔案
所謂小檔案,就是單個檔案的size小於blockSize。這種情況通過增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat將多個input path合併成一個InputSplit送給mapper處理,從而減少mapper的數量。具體細節稍後會更新並展開。
增加Map-Reduce job 啟動時建立的Mapper數量
增加mapper的數量,可以通過減小每個mapper的輸入做到,即減小blockSize或者減小mapred.min.split.size的值。
參考資料: