Hadoop叢集Mapreduce原理篇
Mapreduce是一個分散式運算程式的程式設計框架,是使用者開發“基於hadoop的資料分析應用”的核心框架;
Mapreduce
核心功能
是將使用者編寫的業務邏輯程式碼和自帶預設元件整合成一個完整的分散式運算程式,併發執行在一個hadoop叢集上;
1 為什麼要MAPREDUCE
(1)海量資料在單機上處理因為硬體資源限制,無法勝任
(2)而一旦將單機版程式擴充套件到叢集來分散式執行,將極大增加程式的複雜度和開發難度
(3)引入mapreduce框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分散式計算中的複雜性交由框架來處理
設想一個海量資料場景下的wordcount需求:
單機版:記憶體受限,磁碟受限,運算能力受限 分散式: 1、檔案分散式儲存(HDFS) 2、運算邏輯需要至少分成2個階段(一個階段獨立併發,一個階段匯聚) 3、運算程式如何分發 4、程式如何分配運算任務(切片) 5、兩階段的程式如何啟動?如何協調? 6、整個程式執行過程中的監控?容錯?重試? |
可見在程式由單機版擴成分散式時,會引入大量的複雜工作。為了提高開發效率,可以將分散式程式中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。
而mapreduce就是這樣一個分散式程式的通用框架,其應對以上問題的整體結構如下:
1、MRAppMaster(mapreduce application master) 2、MapTask 3、ReduceTask |
2、 MAPREDUCE框架結構及核心執行機制
2.1 結構
一個完整的mapreduce程式在分散式執行時有三類例項程序:
1、MRAppMaster:負責整個程式的過程排程及狀態協調
2、mapTask:負責map階段的整個資料處理流程
3、ReduceTask:負責reduce階段的整個資料處理流程
2.2 程式執行流程
流程示意圖
.
流程解析
1、 一個mr程式啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動後根據本次job的描述資訊,計算出需要的maptask例項數量,然後向叢集申請機器啟動相應數量的maptask程序
2、 maptask程序啟動之後,根據給定的資料切片範圍進行資料處理,主體流程為:
a) 利用客戶指定的inputformat來獲取RecordReader讀取資料,形成輸入KV對
b) 將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,並將map()方法輸出的KV對收集到快取
c) 將快取中的KV對按照K分割槽排序後不斷溢寫到磁碟檔案
3、 MRAppMaster監控到所有maptask程序任務完成之後,會根據客戶指定的引數啟動相應數量的reducetask程序,並告知reducetask程序要處理的資料範圍(資料分割槽)
4、 Reducetask程序啟動之後,根據MRAppMaster告知的待處理資料所在位置,從若干臺maptask執行所在機器上獲取到若干個maptask輸出結果檔案,並在本地進行重新歸併排序,然後按照相同key的KV為一個組,呼叫客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,然後呼叫客戶指定的outputformat將結果資料輸出到外部儲存
3、並行度決定機制
maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度
那麼,mapTask並行例項是否越多越好呢?其並行度又是如何決定呢?
mapTask並行度的決定機制
一個job的map階段並行度由客戶端在提交job時決定
而客戶端對map階段並行度的規劃的基本邏輯為:
將待處理資料執行邏輯切片(即按照一個特定切片大小,將待處理資料劃分成邏輯上的多個split),然後每一個split分配一個mapTask並行例項處理
這段邏輯及形成的切片規劃描述檔案,由FileInputFormat實現類的getSplits()方法完成,其過程如下圖:
FileInputFormat切片機制
1、切片定義在InputFormat類中的getSplit()方法
2、FileInputFormat中預設的切片機制:
a) 簡單地按照檔案的內容長度進行切片
b) 切片大小,預設等於block大小
c) 切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片
比如待處理資料有兩個檔案:
file1.txt 320M file2.txt 10M |
經過FileInputFormat的切片機制運算後,形成的切片資訊如下:
file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M |
3、FileInputFormat中切片的大小的引數配置
通過分析原始碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize,blockSize)); 切片主要由這幾個值來運算決定
minsize:預設值:1 配置引數: mapreduce.input.fileinputformat.split.minsize |
maxsize:預設值:Long.MAXValue 配置引數:mapreduce.input.fileinputformat.split.maxsize |
blocksize |
因此,預設情況下,切片大小=blocksize
maxsize(切片最大值):
引數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個引數的值
minsize (切片最小值):
引數調的比blockSize大,則可以讓切片變得比blocksize還大
選擇併發數的影響因素:
1、運算節點的硬體配置
2、運算任務的型別:CPU密集型還是IO密集型
3、運算任務的資料量
4、 map並行度的經驗之談
如果硬體配置為2*12core+ 64G,恰當的map並行度是大約每個節點20-100個map,最好每個map的執行時間至少一分鐘。
l 如果job的每個map或者 reducetask的執行時間都只有30-40秒鐘,那麼就減少該job的map或者reduce數,每一個task(map|reduce)的setup和加入到排程器中進行排程,這個中間的過程可能都要花費幾秒鐘,所以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間。
配置task的JVM重用[dht1] 可以改善該問題:
(mapred.job.reuse.jvm.num.tasks,預設是1,表示一個JVM上最多可以順序執行的task
數目(屬於同一個Job)是1。也就是說一個task啟一個JVM)
l 如果input的檔案非常的大,比如1TB,可以考慮將hdfs上的每個block size設大,比如設成256MB或者512MB
5、 ReduceTask並行度的決定
reducetask的並行度同樣影響整個job的執行併發度和執行效率,但與maptask的併發數由切片數決定不同,Reducetask數量的決定是可以直接手動設定:
//預設值是1,手動設定為4
job.setNumReduceTasks(4);
如果資料分佈不均勻,就有可能在reduce階段產生資料傾斜
注意:reducetask數量並不是任意設定,還要考慮業務邏輯需求,有些情況下,需要計算全域性彙總結果,就只能有1個reducetask
儘量不要執行太多的reduce task。對大多數job來說,最好rduce的個數最多和叢集中的reduce持平,或者比叢集的reduce slots小。這個對於小叢集而言,尤其重要。
6、 MAPREDUCE程式執行演示
Hadoop的釋出包中內建了一個hadoop-mapreduce-example-2.4.1.jar,這個jar包中有各種MR示例程式,可以通過以下步驟執行:
啟動hdfs,yarn
然後在叢集中的任意一臺伺服器上啟動執行程式(比如執行wordcount):
hadoopjar hadoop-mapreduce-example-2.4.1.jar wordcount /wordcount/data /wordcount/out
長按識別關注我們,收看更多精彩內容!