1. 程式人生 > >Hadoop作業初始化過程

Hadoop作業初始化過程

排程器呼叫JobTracker.initJob();函式對新作業進行初始化,作業初始化的主要工作是構造Map Task和Reduce Task並對它們進行初始化。

hadoop將每一個作業分解成4種類型的任務,分別是Setup Task、Map Task、Reduce Task和Cleanup Task .它們的執行時資訊由TaskInprogress類維護。因此,建立這些任務實際上是建立TaskInProgress物件。
這裡寫圖片描述

4中任務的作用以及建立過程如下:

1、Setup Task :作業初始化標識性任務,它進行一些非常簡單的作業初始化工作,比如將作業狀態設定為”setup”,呼叫OutputCommitter.setupjob()函式等。該任務執行完後,作業由PREP狀態變為RUNNING狀態。並開始執行Map Task。該型別任務又被分為Map Setup Task和Reduce Setup Task兩種,且每個作業各有一個,他們執行時分別佔用一個Map slot和reduce slot。由於這兩種任務功能相同,因此有且只有一個可以獲取執行的機會(即只要有一個開始執行,另一個馬上被kill掉,而具體哪一個能夠執行,取決於當前存在的空閒slot種類以排程策略)。

建立該類任務的相關程式碼如下:JobInProgess

// create two setup tips, one map and one reduce.

    setup = new TaskInProgress[2];

    // setup map tip. This map doesn't use any split. Just assign an empty split.

    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 

            jobtracker, conf, this, numMapTasks + 1
, 1); setup[0].setJobSetupTask(); // setup reduce tip. setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this, 1); setup[1].setJobSetupTask();

2、Map Task:map階段處理資料的任務,其數目及對應的處理資料分片由應用程式中的InputFormat元件確定。

相關程式碼如下:

TaskSplitMetaInfo[] splits = createSplits(jobId);

    if (numMapTasks != splits.length) {

      throw new IOException("Number of maps in JobConf doesn't match number of " +

          "recieved splits for job " + jobId + "! " +

          "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);

    }

    numMapTasks = splits.length;



    // Sanity check the locations so we don't create/initialize unnecessary tasks

    for (TaskSplitMetaInfo split : splits) {

      NetUtils.verifyHostnames(split.getLocations());

    }

    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);

    jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);

    this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);

    this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);

    maps = new TaskInProgress[numMapTasks];

    for(int i=0; i < numMapTasks; ++i) {

      inputLength += splits[i].getInputDataLength();

      maps[i] = new TaskInProgress(jobId, jobFile, 

                                   splits[i], 

                                   jobtracker, conf, this, i, numSlotsPerMap);

    }

3、Reduce Task Reduce階段處理資料的任務,其數目有使用者通過引數Mapred.reduce.tasks(預設數目為1)指定,考慮到Reduce Task能否執行依賴於Map Task的輸出結果,因此,hadoop剛開始只會排程Map Task,直到Map Task完成數目達到一定比例(由引數Mapred.reduce.slowstart.completed.maps指定,預設是0.05,即5%)

相關程式碼如 // Create reduce tasks

 this.reduces = new TaskInProgress[numReduceTasks];

for (int i = 0; i < numReduceTasks; i++) {

  reduces[i] = new TaskInProgress(jobId, jobFile, 

                                  numMapTasks, i, 

                                  jobtracker, conf, this, numSlotsPerReduce);

  nonRunningReduces.add(reduces[i]);

}

4、Cleanup Task:作業結束標誌性任務,主要完成一些作業清理工作,比如刪除作業執行過程中用到的一些臨時目錄(比如 Temporary目錄)。一旦該任務執行成功後,作業由RUNNING狀態變為SUCCEEDED狀態。這個和Setup Task 類似 會建立2個Task 一個Map Cleanup Task和Reduce Cleanup Task

程式碼如下:

// create cleanup two cleanup tips, one map and one reduce.

    cleanup = new TaskInProgress[2];

    // cleanup map tip. This map doesn't use any splits. Just assign an empty

    // split.

    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;

    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 

            jobtracker, conf, this, numMapTasks, 1);

    cleanup[0].setJobCleanupTask();

    // cleanup reduce tip.

    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

                       numReduceTasks, jobtracker, conf, this, 1);

    cleanup[1].setJobCleanupTask();

有人發現引入Setup/Cleanup Task會拖慢作業執行進度且降低作業的可靠性,這主要是因為hadoop除了需要保證每個Map/Reduce Task執行成功外,還要保證Setup/Cleanup Task成功。對於Map/Reduce Task而言,可通過推測執行機制避免出現”拖後腿”的任務。然而,由於Setup/Cleanup Task不會處理任何資料,兩種任務的進度只有0%和100%兩個值,從而使得推測式任務機制對之不適用,為了解決這個問題,從0.21.0版本開始,hadoop將是否開啟Setup/Cleanup Task 變成了可配置的選項,使用者可通過引數 mapred.committer.job.set.cleanup.needed配置是否為作業建立Setup/Cleanup Task