1. 程式人生 > >hadoop2 作業執行過程之map過程

hadoop2 作業執行過程之map過程

在執行MAP任務之前,先了解一下它的容器和它容器的領導:container和nodemanager

NodeManager

NodeManager(NM)是YARN中每個節點上的代理,它管理Hadoop叢集中的單個計算節點,包括與ResourceManager保持通訊,監督Container的生命週期管理,監控每個Container的資源使用(記憶體、CPU等)情況,追蹤節點健康狀況,管理日誌和不同應用程式用到的附屬服務(auxiliary service)

它包含以下幾大元件:

1.NodeStatusUpdater

當NM啟動時,該元件向RM註冊,併發送節點上的可用資源。接下來,NM與RM通訊,彙報Container的狀態更新,包括節點上正在執行的Container、已完成的Container等,此外,RM可能向NodeStatusUpdater發訊號,殺死處於執行中的Container

NodeStatusUpdater是NM和RM通訊的唯一通道,週期性地呼叫RPC函式nodeHeartbeat()向RM彙報節點上的各種資訊

2.ContainerManager

它是NM中的核心元件,實現類是ContainerManagerImpl。它有幾個元件組成,各自負責一部分功能,意管理執行在該節點上的所有Container

2.1RPC Server

它負責從AM上接收RPC請求以啟動Container或者停止。

供AM使用的介面分別是:startContainer()、stopContainer()、getContainerStatus()

2.2ResourceLocalizationService

負責(從HDFS)安全下載(採用多執行緒)和組織Container需要的各種檔案資源。

2.3ContainerLauncher

維護了一個執行緒池,隨時準備並在必要時儘快啟動Container。同時會接收來自RM或者AM的清理Container請求,清理相應程序

2.4AuxServices

NM提供了一個框架以通過配置附屬服務擴充套件自己的功能,這些服務是與NM其他服務隔離開的。

2.5ContainerMonitor

當一個Container啟動以後,該元件便開始觀察它在執行過程中的資源利用。NM啟動一個Container後,ContainerMonitor會將改Container程序對應的一個pid新增到監控列表中

2.6LogHandler

一個可插拔元件,使用者通過它可以宣傳將Container日誌寫到本地磁碟或者打包上傳到一個檔案系統中。

3.ContainerExecutor

與地產作業系統互動,安全存放Container需要的檔案和目錄,進而以以一種安全的方式啟動或者清除Container程序

4.NodeHealthCheckerService

週期性地執行一個配置好的腳步檢查節點的健康狀況,任何系統健康方面的改變都會通知NodeStatusUpdater傳遞給RM

5.Security

5.1ApplicationACLsManager 為所有面向用戶的API提供安全檢查

5.2ContainerTokenSecretManager 檢查收到各種請求的合法性,確保這些請求已被RM授權

6.WebServer

web展示

Container

Container的概念

首先它和Linux的Container完全不同。它的使用是啟動AM的時候和執行Task的時候,但是它的涉及則是RM向資源排程器申請啟動AM的資源時和AM向RM的資源排程器申請啟動Task資源時;

當向資源排程器申請資源時,需向它傳送一個ResourceRequest列表,其中描述了一個資源單元的詳細請求,而資源排程器為之返回分配的資源來描述Container。每個ResourceRequest可看做一個可序列化的Java物件

複製程式碼
message ResourceRequestProto {
  optional PriorityProto priority = 1; // 資源優先順序
  optional string resource_name = 2; // 資源名稱(期望資源所在的host、rack名稱等)
  optional ResourceProto capability = 3; // 資源量(僅支援CPU和記憶體兩種資源)
  optional int32 num_containers = 4; // 滿足以上條件的資源個數
  optional bool relax_locality = 5 [default = true];  //是否支援本地性鬆弛
}
複製程式碼

這些資源預設是本地鬆弛的,即申請優先順序為10,資源名稱為“node11”,資源量為<2GB,1CPU>的5份資源時,如果node11上沒有滿足要求的資源,則優先找node11同機架上其他資源,繼而找其他機架

AM收到一個或者多個Container後,再次將改Container進一步分配給內部的某個任務,一旦確定任務後,AM需將任務執行環境(包括執行命令、環境變數、依賴的外部檔案等)連同Container中的資源封裝到ContainerLaunchContext物件中,進而與對應的NM通訊,以啟動該任務

複製程式碼
message ContainerLaunchContextProto {
  repeated StringLocalResourceMapProto localResources = 1; //Container啟動以來的外部資源
  optional bytes tokens = 2;
  repeated StringBytesMapProto service_data = 3;
  repeated StringStringMapProto environment = 4; //Container啟動所需的環境變數
  repeated string command = 5; //Container內部執行的任務啟動命令,如果是MapReduce的話,Map/Reduce Task啟動命令就在該欄位中
  repeated ApplicationACLMapProto application_ACLs = 6;
}
複製程式碼

Container啟動步驟:

1.資源本地化

在本地拷貝一份執行Container所需的所有資源(通過Distributed Cache實現);

為Container建立經隔離的工作目錄,並在這些目錄中準備好所有資源;

YARN將資源分為兩類:一類是public級別的資源,放在公共目錄下,由所有使用者共享,另一類是private級別的資源,這類資源時使用者私有的,只能在所屬使用者的各個作業間共享。

2.啟動Container

啟動Container是由ContainerLauncher完成的;

3.執行Container

由ContainerExecutor完成

4.資源回收

由ResourceLocalizationService服務完成,該過程與資源本地化正好相反,它負責撤銷Container執行過程中使用的各種資源。

MAP

mapper就是在執行Container的時候執行的。主角上場。

Map任務是一類將輸入記錄轉換為中間格式記錄集的獨立任務。Mapper類中的map方法將輸入鍵值對對映到一組中間格式的鍵值對集合

Container啟動以後會根據AM傳過來的任務資訊啟動一個YarnChild程序來執行任務,YarnChild直接呼叫分給它的jvmTask,而jvmTask則判斷是map任務還是reduce任務來分別執行MapTask和ReduceTask來執行Map過程和Reduce過程

每個task都會使用一個程序佔用一個JVM來執行,org.apache.hadoop.mapred.Child方法是具體的JVM啟動類

taskFinal.run(job, umbilical); // run the task
複製程式碼
if (taskComing) {
      boolean isMap = in.readBoolean();
      if (isMap) {
        t = new MapTask();
      } else {
        t = new ReduceTask();
      }
      t.readFields(in);
    }
複製程式碼

這裡的taskFinal就是jvmTask

自定義的Map類繼承自Mapper,由MapTask的run()方法來執行

複製程式碼
 1   @Override
 2   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
 3     throws IOException, ClassNotFoundException, InterruptedException {
 4     this.umbilical = umbilical;
 5 
 6     if (isMapTask()) {
 7       // If there are no reducers then there won't be any sort. Hence the map 
 8       // phase will govern the entire attempt's progress.
 9       if (conf.getNumReduceTasks() == 0) {
10         mapPhase = getProgress().addPhase("map", 1.0f);
11       } else {
12         // If there are reducers then the entire attempt's progress will be 
13         // split between the map phase (67%) and the sort phase (33%).
14         mapPhase = getProgress().addPhase("map", 0.667f);
15         sortPhase  = getProgress().addPhase("sort", 0.333f);
16       }
17     }
18     TaskReporter reporter = startReporter(umbilical);
19  
20     boolean useNewApi = job.getUseNewMapper();
21     initialize(job, getJobID(), reporter, useNewApi);
22 
23     // check if it is a cleanupJobTask
24     if (jobCleanup) {
25       runJobCleanupTask(umbilical, reporter);
26       return;
27     }
28     if (jobSetup) {
29       runJobSetupTask(umbilical, reporter);
30       return;
31     }
32     if (taskCleanup) {
33       runTaskCleanupTask(umbilical, reporter);
34       return;
35     }
36 
37     if (useNewApi) {
38       runNewMapper(job, splitMetaInfo, umbilical, reporter);
39     } else {
40       runOldMapper(job, splitMetaInfo, umbilical, reporter);
41     }
42     done(umbilical, reporter);
43   }
複製程式碼

MapTask先判斷是否有Reduce任務,如果沒有的話,Map任務結束則整個提交的作業結束;如果有的話,當Map任務完成的時候設定當前進度為66.7%,Sort完成的時候設定進度為33.3%;

之後啟動TaskReporter,用於更新當前狀態;

之後初始化任務,設定當前任務的狀態為RUNNING,設定輸出目錄等;

之後判斷任務是不是jobCleanup任務、jobSetup任務、taskCleanup任務,並做相應的處理;

之後判斷使用新舊哪套API,因為MapTask要相容兩套API;

確定以後呼叫runNewMapper方法,執行具體的map;

作業完成以後呼叫done方法,進行任務的清理、計數器的更新、任務狀態更新等;

hadoop2的話是使用runNewMapper()

複製程式碼
 1   @SuppressWarnings("unchecked")
 2   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
 3   void runNewMapper(final JobConf job,
 4                     final TaskSplitIndex splitIndex,
 5                     final TaskUmbilicalProtocol umbilical,
 6                     TaskReporter reporter
 7                     ) throws IOException, ClassNotFoundException,
 8                              InterruptedException {
 9     // make a task context so we can get the classes
10     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
11       new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
12                                                                   getTaskID(),
13                                                                   reporter);
14     // make a mapper
15     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
16       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
17         ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
18     // make the input format
19     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
20       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
21         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
22     // rebuild the input split
23     org.apache.hadoop.mapreduce.InputSplit split = null;
24     split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
25         splitIndex.getStartOffset());
26     LOG.info("Processing split: " + split);
27 
28     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
29       new NewTrackingRecordReader<INKEY,INVALUE>
30         (split, inputFormat, reporter, taskContext);
31     
32     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
33     org.apache.hadoop.mapreduce.RecordWriter output = null;
34     
35     // get an output object
36     if (job.getNumReduceTasks() == 0) {
37       output = 
38         new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
39     } else {
40       output = new NewOutputCollector(taskContext, job, umbilical, reporter);
41     }
42 
43     org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
44     mapContext = 
45       new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
46           input, output, 
47           committer, 
48           reporter, split);
49 
50     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
51         mapperContext = 
52           new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
53               mapContext);
54 
55     try {
56       input.initialize(split, mapperContext);
57       mapper.run(mapperContext);
58       mapPhase.complete();
59       setPhase(TaskStatus.Phase.SORT);
60       statusUpdate(umbilical);
61       input.close();
62       input = null;
63       output.close(mapperContext);
64       output = null;
65     } finally {
66       closeQuietly(input);
67       closeQuietly(output, mapperContext);
68     }
69   }
複製程式碼

它的執行過程是:

1.獲取配置資訊類物件TaskAttemptContextImplement、自己開發的Mapper例項、使用者指定的InputFormat物件(預設是TextInputFormat)、任務對應的分片資訊split;

2.根據inputFormat構建一個NewTrackingRecordReader物件,這個物件中的RecordReader<K,V> real是LineRecordReader,用於讀取分片中的內容,傳遞給Mapper的map方法處理;

3.執行Mapper中的setup方法;

4.迴圈執行map方法;

5.執行cleanup方法;

6.最後是輸出流的關閉output.close(mapperContext),該方法會執行MapOutputBuffer.flush()操作,將剩餘資料也通過sortAndSpill()方法寫入本地檔案,並在最後呼叫mergeParts()方法合併所有的spill檔案。

關於spill,spill是map中比較重要的設計

spill過程包括輸出、排序、溢寫、合併等步驟;

每個Map任務不斷的以<key,value>對的形式把資料輸出到記憶體中構造一個環形的資料結構。這個資料結構其實是一個位元組陣列,叫kvbuffer,這裡面不僅有<Key,Value>資料,還有索引資料,並且給放置索引資料的區域起了一個kvmeta的別名

當這個緩衝區滿足一定條件後就會對緩衝區kvbuffer中的資料進行排序,先按分割槽編號partition進行升序,然後按照key進行升序。這樣快速排序後資料以分割槽為單位聚集在一起,且同一分割槽的所有資料按照key有序。然後通過sortAndSpill方法寫到本地檔案和索引檔案;如果有combiner,spill之前也會做一次聚集操作,等資料跑完通過歸併合併所有spill檔案和索引檔案。

Map階段的結果都會儲存在本地種(如果有reducer的話),非HDFS。