shuffle(partitioner+combiner+sort)
阿新 • • 發佈:2018-12-20
shuffle(partitioner+combiner+sort)
- 每一個map有一個環形記憶體緩衝區,用於儲存任務的輸出。預設大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spill.percent),一個後臺執行緒把內容寫到(spill)磁碟的指定目錄(mapred.local.dir)下的新建的一個溢位寫檔案
- 寫磁碟前,要partition,sort。如果有combiner,combiner排序後資料。
- 等最後距離寫完,合併全部溢位寫檔案為一個分割槽且排序的檔案。
- reducer通過http方式得到輸出檔案的分割槽。
- TaskTracker為分割槽檔案執行Reduce任務。複製階段把Map輸出複製到Reducer的記憶體或磁碟。一個Map任務完畢,Reduce就開始複製輸出。
- 排序階段合併map輸出。然後走Reduce階段。
MR過程各個角色的作用
- jobClient:提交作業
- JobTracker:初始化作業,分配作業,TaskTracker與其進行通訊,協調監控整個作業
- TaskTracker:定期與JobTracker通訊,執行Map和Reduce任務
- HDFS:保留作業的資料、配置、jar包、結果
作業提交
- 提交作業之前,需要對作業進行配置 編寫自己的MR程式 配置作業,包括輸入輸出路徑等。。。
- 提交作業 配置完畢後,通過JobClient提交
- 具體功能:
- 與JobTracker通訊得到一個jar的儲存路徑和Jobld
- 輸入輸出路徑檢查
- 將jobjar拷貝到HDFS
- 計算輸入分片,將分片資訊寫到job.split中
- 寫job.xml
- 真正提交作業
作業初始化
- 客戶端提交作業後,JobTracker會將作業加入到佇列,然後進行排程,預設是FIFO方式
- 具體功能:
- 作業初始化只要是值JobInProgress中完成的
- 讀取分片資訊
- 任務建立task包扣Map和reduce任務
- 建立TaskInProgress執行task
任務分配
- tasktracker與jobtracker之間的通訊和任務分配是通過心跳機制實現的
- tasktracker會主動定期向jobtracker傳送心跳資訊,詢問是否有任務要做,如果有,就會申請到任務
- 心跳 定期 任務完成--領
任務執行
- 如果tasktracker拿到任務,會將所有的資訊拷貝到本地,包扣程式碼、配置、分片資訊等
- tasktracker中的localizeJob()方法會被條用進行本地化,拷貝job .jar,jobconf,job.xml到本地
- tasktracker呼叫launchTaskForJob()方法載入啟動任務
- MapTaskRunner和ReduceTaskRunner分別啟動javachild程序來執行相應的任務
狀態更新
- Task會定期向TaskTraker回報執行情況
- TaskTracker會定期收集所在叢集上的所有Task的資訊,並向JobTracker回報
- JobTracker會根據所有TaskTracker回報上來的資訊進行彙總
作業完成
- JobTracker是在接收到最後一個任務完成後,才將任務標記為成功
- 將紓解寫入到HDFS中
錯誤處理
- JobTracker失敗 存在單點故障,hadoop2.0解決了這個問題
- TaskTracker失敗 tasktracker崩潰了會停止向Jobtracker傳送心跳資訊,jobtracker會將tasktracker從等待的任務池中移除,並將該任務轉移到其他的地方執行,jobtracker將tasktracker加入到黑名單中
- Task失敗 任務失敗,會向TaskTracker丟擲異常 任務掛起
JobTracker
- 負責接收使用者提交的作業,負責啟動、跟蹤任務執行
- JobSubmissionProtocol是JobClient與JobTracker通訊的介面
- InterTrackerProtocol是TaskTracker與JobTracker通訊的介面
TaskTracker
- 負責執行任務
JobClient
- 是使用者作業與JobTracker互動的主要介面
- 負責提交作業的,負責啟動、跟蹤任務執行、訪問任務狀態和日誌等。
Partitioner程式設計
- partitioner是partitioner的基類,如果需要定製partitioner也需要繼承該類
- HashPartitioner是mapreduce的預設partitioner。計算方法是which reduce=(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks,得到當前的目的reducer
- mapper ----獲取資料
- partitioner ----分割槽 屬於shuffle
- reduce ---計算
public class TCPartitioner extends Partitioner<Text, TelBean>{
@Override
public int getPartition(Text key, TelBean bean, int arg2) {
// TODO Auto-generated method stub
}
}
把partitioner加入到job裡面
- 把partitioner新增到job裡面
- job.setPartitionerClass(TCPartitioner.class);
- 設定reduceTasks的數量 有幾個分割槽設定幾個任務
- job.setNumReduceTasks(2);
sort程式設計
繼承WritableComparable<> -序列化並且排序
public class Bean implements WritableComparable<Bean>{
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
}
@Override
public int compareTo(Bean bean) {
// TODO Auto-generated method stub
}
}
combiner程式設計
- 每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合併,以減少傳輸到reducer的資料量
- combiner最基本是實現本地key的歸併,combiner具有類似本地的reduce功能
- 如果不用combiner,那麼,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度
- 注意:combiner的輸出是reduce的輸入,如果combiner是可插拔的,新增combiner絕對不能改變最終的計算結果。所以combiner只應該用於那種reduce的輸入kry/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等。。。
- combiner就是map端的educer
- job.setCombinerClass();