MapReduce並行度機制
阿新 • • 發佈:2018-09-30
最小值 blocks 結束 完成 多個 its get file 執行時間 1. MapTask並行度機制
MapTask的並行度指的是map階段有多少個並行的task共同處理任務。map階段的任務處理並行度,勢必影響到整個job的處理速度。那麽,MapTask並行實例是否越多越好呢?其並行度又是如何決定呢?
一個MapReducejob的map階段並行度由客戶端在提交job時決定,即客戶端提交job之前會對待處理數據進行邏輯切片。切片完成會形成切片規劃文件(job.split),每個邏輯切片最終對應啟動一個maptask。
邏輯切片機制由FileInputFormat實現類的getSplits()方法完成。
FileInputFormat切片機制
FileInputFormat中默認的切片機制:
A. 簡單地按照文件的內容長度進行切片
B. 切片大小,默認等於block大小
C. 切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
比如待處理數據有兩個文件:
file1.txt 320M
file2.txt 10M
經過FileInputFormat的切片機制運算後,形成的切片信息如下:
file1.txt.split1—0M~128M
file1.txt.split2—128M~256M
file1.txt.split3—256M~320M
file2.txt.split1—0M~10M Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個值來運算決定:
minsize:默認值:1
配置參數: mapreduce.input.fileinputformat.split.minsize
maxsize:默認值:Long.MAXValue
配置參數:mapreduce.input.fileinputformat.split.maxsize
blocksize
因此,默認情況下,split size=blocksize,在hadoop 2.x中為128M。
maxsize(切片最大值):參數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個參數的。
minsize (切片最小值):參數調的比blockSize大,則可以讓切片變得比blocksize還大。
但是,不論怎麽調參數,都不能讓多個小文件“劃入”一個split。
還有個細節就是:
當bytesRemaining/splitSize > 1.1不滿足的話,那麽最後所有剩余的會作為一個切片。從而不會形成例如129M文件規劃成兩個切片的局面。
MapTask的並行度指的是map階段有多少個並行的task共同處理任務。map階段的任務處理並行度,勢必影響到整個job的處理速度。那麽,MapTask並行實例是否越多越好呢?其並行度又是如何決定呢?
一個MapReducejob的map階段並行度由客戶端在提交job時決定,即客戶端提交job之前會對待處理數據進行邏輯切片。切片完成會形成切片規劃文件(job.split),每個邏輯切片最終對應啟動一個maptask。
邏輯切片機制由FileInputFormat實現類的getSplits()方法完成。
FileInputFormat切片機制
FileInputFormat中默認的切片機制:
B. 切片大小,默認等於block大小
C. 切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
比如待處理數據有兩個文件:
file1.txt 320M
file2.txt 10M
經過FileInputFormat的切片機制運算後,形成的切片信息如下:
file1.txt.split1—0M~128M
file1.txt.split2—128M~256M
file1.txt.split3—256M~320M
file2.txt.split1—0M~10M
FileInputFormat中切片的大小的參數配置
在FileInputFormat中,計算切片大小的邏輯:
切片主要由這幾個值來運算決定:
minsize:默認值:1
配置參數: mapreduce.input.fileinputformat.split.minsize
maxsize:默認值:Long.MAXValue
配置參數:mapreduce.input.fileinputformat.split.maxsize
blocksize
因此,默認情況下,split size=blocksize,在hadoop 2.x中為128M。
maxsize(切片最大值):參數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個參數的。
但是,不論怎麽調參數,都不能讓多個小文件“劃入”一個split。
還有個細節就是:
當bytesRemaining/splitSize > 1.1不滿足的話,那麽最後所有剩余的會作為一個切片。從而不會形成例如129M文件規劃成兩個切片的局面。
2. Reducetask並行度機制
reducetask並行度同樣影響整個job的執行並發度和執行效率,與maptask的並發數由切片數決定不同,Reducetask數量的決定是可以直接手動設置:
job.setNumReduceTasks(4);
如果數據分布不均勻,就有可能在reduce階段產生數據傾斜。
註意: reducetask數量並不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局匯總結果,就只能有1個reducetask。
3. Task並行度經驗之談
最好每個task的執行時間至少一分鐘。
如果job的每個map或者 reduce task的運行時間都只有30-40秒鐘,那麽就減少該job的map或者reduce數,每一個task(map|reduce)的setup和加入到調度器中進行調度,這個中間的過程可能都要花費幾秒鐘,所以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間。
此外,默認情況下,每一個task都是一個新的JVM實例,都需要開啟和銷毀的開銷。在一些情況下,JVM開啟和銷毀的時間可能會比實際處理數據的時間要消耗的長,配置task的JVM重用可以改善該問題:
(mapred.job.reuse.jvm.num.tasks,默認是1,表示一個JVM上最多可以順序執行的task數目(屬於同一個Job)是1。也就是說一個task啟一個JVM)
如果input的文件非常的大,比如1TB,可以考慮將hdfs上的每個block size設大,比如設成256MB或者512MB
MapReduce並行度機制