1. 程式人生 > >shuffle(partitioner+combiner+sort)

shuffle(partitioner+combiner+sort)

shuffle(partitioner+combiner+sort)

  1. 每一個map有一個環形記憶體緩衝區,用於儲存任務的輸出。預設大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spill.percent),一個後臺執行緒把內容寫到(spill)磁碟的指定目錄(mapred.local.dir)下的新建的一個溢位寫檔案
  2. 寫磁碟前,要partition,sort。如果有combiner,combiner排序後資料。
  3. 等最後距離寫完,合併全部溢位寫檔案為一個分割槽且排序的檔案。
  4. reducer通過http方式得到輸出檔案的分割槽。
  5. TaskTracker為分割槽檔案執行Reduce任務。複製階段把Map輸出複製到Reducer的記憶體或磁碟。一個Map任務完畢,Reduce就開始複製輸出。
  6. 排序階段合併map輸出。然後走Reduce階段。

MR過程各個角色的作用

  1. jobClient:提交作業
  2. JobTracker:初始化作業,分配作業,TaskTracker與其進行通訊,協調監控整個作業
  3. TaskTracker:定期與JobTracker通訊,執行Map和Reduce任務
  4. HDFS:保留作業的資料、配置、jar包、結果

作業提交

  1. 提交作業之前,需要對作業進行配置   編寫自己的MR程式   配置作業,包括輸入輸出路徑等。。。
  2. 提交作業   配置完畢後,通過JobClient提交
  3. 具體功能:
  4. 與JobTracker通訊得到一個jar的儲存路徑和Jobld
  5. 輸入輸出路徑檢查
  6. 將jobjar拷貝到HDFS
  7. 計算輸入分片,將分片資訊寫到job.split中
  8. 寫job.xml
  9. 真正提交作業

作業初始化

  1. 客戶端提交作業後,JobTracker會將作業加入到佇列,然後進行排程,預設是FIFO方式
  2. 具體功能:
  3. 作業初始化只要是值JobInProgress中完成的
  4. 讀取分片資訊
  5. 任務建立task包扣Map和reduce任務
  6. 建立TaskInProgress執行task

任務分配

  1. tasktracker與jobtracker之間的通訊和任務分配是通過心跳機制實現的
  2. tasktracker會主動定期向jobtracker傳送心跳資訊,詢問是否有任務要做,如果有,就會申請到任務
  3. 心跳  定期   任務完成--領

任務執行

  1. 如果tasktracker拿到任務,會將所有的資訊拷貝到本地,包扣程式碼、配置、分片資訊等
  2. tasktracker中的localizeJob()方法會被條用進行本地化,拷貝job .jar,jobconf,job.xml到本地
  3. tasktracker呼叫launchTaskForJob()方法載入啟動任務
  4. MapTaskRunner和ReduceTaskRunner分別啟動javachild程序來執行相應的任務

狀態更新

  1. Task會定期向TaskTraker回報執行情況
  2. TaskTracker會定期收集所在叢集上的所有Task的資訊,並向JobTracker回報
  3. JobTracker會根據所有TaskTracker回報上來的資訊進行彙總

作業完成

  1. JobTracker是在接收到最後一個任務完成後,才將任務標記為成功
  2. 將紓解寫入到HDFS中

錯誤處理

  1. JobTracker失敗  存在單點故障,hadoop2.0解決了這個問題
  2. TaskTracker失敗  tasktracker崩潰了會停止向Jobtracker傳送心跳資訊,jobtracker會將tasktracker從等待的任務池中移除,並將該任務轉移到其他的地方執行,jobtracker將tasktracker加入到黑名單中
  3. Task失敗  任務失敗,會向TaskTracker丟擲異常   任務掛起

JobTracker

  1. 負責接收使用者提交的作業,負責啟動、跟蹤任務執行
  2. JobSubmissionProtocol是JobClient與JobTracker通訊的介面
  3. InterTrackerProtocol是TaskTracker與JobTracker通訊的介面

TaskTracker

  1. 負責執行任務

JobClient

  1. 是使用者作業與JobTracker互動的主要介面
  2. 負責提交作業的,負責啟動、跟蹤任務執行、訪問任務狀態和日誌等。

Partitioner程式設計

  1. partitioner是partitioner的基類,如果需要定製partitioner也需要繼承該類
  2. HashPartitioner是mapreduce的預設partitioner。計算方法是which reduce=(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks,得到當前的目的reducer
  3. mapper ----獲取資料
  4. partitioner  ----分割槽  屬於shuffle
  5. reduce   ---計算
public class TCPartitioner extends Partitioner<Text, TelBean>{
    
    @Override
    public int getPartition(Text key, TelBean bean, int arg2) {
        // TODO Auto-generated method stub
    
    }
}

把partitioner加入到job裡面

  1. 把partitioner新增到job裡面
  2. job.setPartitionerClass(TCPartitioner.class);
  3. 設定reduceTasks的數量  有幾個分割槽設定幾個任務
  4. job.setNumReduceTasks(2);

sort程式設計

繼承WritableComparable<> -序列化並且排序

public class Bean implements WritableComparable<Bean>{
   
 //反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
    }
   
 //序列化
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
    }
   
 @Override
    public int compareTo(Bean bean) {
        // TODO Auto-generated method stub
    }
}

combiner程式設計

  1. 每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合併,以減少傳輸到reducer的資料量
  2. combiner最基本是實現本地key的歸併,combiner具有類似本地的reduce功能
  3. 如果不用combiner,那麼,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度
  4. 注意:combiner的輸出是reduce的輸入,如果combiner是可插拔的,新增combiner絕對不能改變最終的計算結果。所以combiner只應該用於那種reduce的輸入kry/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等。。。
  5. combiner就是map端的educer
  6. job.setCombinerClass();