hadoop2 作業執行過程之map過程
在執行MAP任務之前,先了解一下它的容器和它容器的領導:container和nodemanager
NodeManager
NodeManager(NM)是YARN中每個節點上的代理,它管理Hadoop叢集中的單個計算節點,包括與ResourceManager保持通訊,監督Container的生命週期管理,監控每個Container的資源使用(記憶體、CPU等)情況,追蹤節點健康狀況,管理日誌和不同應用程式用到的附屬服務(auxiliary service)
它包含以下幾大元件:
1.NodeStatusUpdater
當NM啟動時,該元件向RM註冊,併發送節點上的可用資源。接下來,NM與RM通訊,彙報Container的狀態更新,包括節點上正在執行的Container、已完成的Container等,此外,RM可能向NodeStatusUpdater發訊號,殺死處於執行中的Container
NodeStatusUpdater是NM和RM通訊的唯一通道,週期性地呼叫RPC函式nodeHeartbeat()向RM彙報節點上的各種資訊
2.ContainerManager
它是NM中的核心元件,實現類是ContainerManagerImpl。它有幾個元件組成,各自負責一部分功能,意管理執行在該節點上的所有Container
2.1RPC Server
它負責從AM上接收RPC請求以啟動Container或者停止。
供AM使用的介面分別是:startContainer()、stopContainer()、getContainerStatus()
2.2ResourceLocalizationService
負責(從HDFS)安全下載(採用多執行緒)和組織Container需要的各種檔案資源。
2.3ContainerLauncher
維護了一個執行緒池,隨時準備並在必要時儘快啟動Container。同時會接收來自RM或者AM的清理Container請求,清理相應程序
2.4AuxServices
NM提供了一個框架以通過配置附屬服務擴充套件自己的功能,這些服務是與NM其他服務隔離開的。
2.5ContainerMonitor
當一個Container啟動以後,該元件便開始觀察它在執行過程中的資源利用。NM啟動一個Container後,ContainerMonitor會將改Container程序對應的一個pid新增到監控列表中
2.6LogHandler
一個可插拔元件,使用者通過它可以宣傳將Container日誌寫到本地磁碟或者打包上傳到一個檔案系統中。
3.ContainerExecutor
與地產作業系統互動,安全存放Container需要的檔案和目錄,進而以以一種安全的方式啟動或者清除Container程序
4.NodeHealthCheckerService
週期性地執行一個配置好的腳步檢查節點的健康狀況,任何系統健康方面的改變都會通知NodeStatusUpdater傳遞給RM
5.Security
5.1ApplicationACLsManager 為所有面向用戶的API提供安全檢查
5.2ContainerTokenSecretManager 檢查收到各種請求的合法性,確保這些請求已被RM授權
6.WebServer
web展示
Container
Container的概念
首先它和Linux的Container完全不同。它的使用是啟動AM的時候和執行Task的時候,但是它的涉及則是RM向資源排程器申請啟動AM的資源時和AM向RM的資源排程器申請啟動Task資源時;
當向資源排程器申請資源時,需向它傳送一個ResourceRequest列表,其中描述了一個資源單元的詳細請求,而資源排程器為之返回分配的資源來描述Container。每個ResourceRequest可看做一個可序列化的Java物件
message ResourceRequestProto { optional PriorityProto priority = 1; // 資源優先順序 optional string resource_name = 2; // 資源名稱(期望資源所在的host、rack名稱等) optional ResourceProto capability = 3; // 資源量(僅支援CPU和記憶體兩種資源) optional int32 num_containers = 4; // 滿足以上條件的資源個數 optional bool relax_locality = 5 [default = true]; //是否支援本地性鬆弛 }
這些資源預設是本地鬆弛的,即申請優先順序為10,資源名稱為“node11”,資源量為<2GB,1CPU>的5份資源時,如果node11上沒有滿足要求的資源,則優先找node11同機架上其他資源,繼而找其他機架
AM收到一個或者多個Container後,再次將改Container進一步分配給內部的某個任務,一旦確定任務後,AM需將任務執行環境(包括執行命令、環境變數、依賴的外部檔案等)連同Container中的資源封裝到ContainerLaunchContext物件中,進而與對應的NM通訊,以啟動該任務
message ContainerLaunchContextProto { repeated StringLocalResourceMapProto localResources = 1; //Container啟動以來的外部資源 optional bytes tokens = 2; repeated StringBytesMapProto service_data = 3; repeated StringStringMapProto environment = 4; //Container啟動所需的環境變數 repeated string command = 5; //Container內部執行的任務啟動命令,如果是MapReduce的話,Map/Reduce Task啟動命令就在該欄位中 repeated ApplicationACLMapProto application_ACLs = 6; }
Container啟動步驟:
1.資源本地化
在本地拷貝一份執行Container所需的所有資源(通過Distributed Cache實現);
為Container建立經隔離的工作目錄,並在這些目錄中準備好所有資源;
YARN將資源分為兩類:一類是public級別的資源,放在公共目錄下,由所有使用者共享,另一類是private級別的資源,這類資源時使用者私有的,只能在所屬使用者的各個作業間共享。
2.啟動Container
啟動Container是由ContainerLauncher完成的;
3.執行Container
由ContainerExecutor完成
4.資源回收
由ResourceLocalizationService服務完成,該過程與資源本地化正好相反,它負責撤銷Container執行過程中使用的各種資源。
MAP
mapper就是在執行Container的時候執行的。主角上場。
Map任務是一類將輸入記錄轉換為中間格式記錄集的獨立任務。Mapper類中的map方法將輸入鍵值對對映到一組中間格式的鍵值對集合
Container啟動以後會根據AM傳過來的任務資訊啟動一個YarnChild程序來執行任務,YarnChild直接呼叫分給它的jvmTask,而jvmTask則判斷是map任務還是reduce任務來分別執行MapTask和ReduceTask來執行Map過程和Reduce過程
每個task都會使用一個程序佔用一個JVM來執行,org.apache.hadoop.mapred.Child方法是具體的JVM啟動類
taskFinal.run(job, umbilical); // run the task
if (taskComing) { boolean isMap = in.readBoolean(); if (isMap) { t = new MapTask(); } else { t = new ReduceTask(); } t.readFields(in); }
這裡的taskFinal就是jvmTask
自定義的Map類繼承自Mapper,由MapTask的run()方法來執行
1 @Override 2 public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) 3 throws IOException, ClassNotFoundException, InterruptedException { 4 this.umbilical = umbilical; 5 6 if (isMapTask()) { 7 // If there are no reducers then there won't be any sort. Hence the map 8 // phase will govern the entire attempt's progress. 9 if (conf.getNumReduceTasks() == 0) { 10 mapPhase = getProgress().addPhase("map", 1.0f); 11 } else { 12 // If there are reducers then the entire attempt's progress will be 13 // split between the map phase (67%) and the sort phase (33%). 14 mapPhase = getProgress().addPhase("map", 0.667f); 15 sortPhase = getProgress().addPhase("sort", 0.333f); 16 } 17 } 18 TaskReporter reporter = startReporter(umbilical); 19 20 boolean useNewApi = job.getUseNewMapper(); 21 initialize(job, getJobID(), reporter, useNewApi); 22 23 // check if it is a cleanupJobTask 24 if (jobCleanup) { 25 runJobCleanupTask(umbilical, reporter); 26 return; 27 } 28 if (jobSetup) { 29 runJobSetupTask(umbilical, reporter); 30 return; 31 } 32 if (taskCleanup) { 33 runTaskCleanupTask(umbilical, reporter); 34 return; 35 } 36 37 if (useNewApi) { 38 runNewMapper(job, splitMetaInfo, umbilical, reporter); 39 } else { 40 runOldMapper(job, splitMetaInfo, umbilical, reporter); 41 } 42 done(umbilical, reporter); 43 }
MapTask先判斷是否有Reduce任務,如果沒有的話,Map任務結束則整個提交的作業結束;如果有的話,當Map任務完成的時候設定當前進度為66.7%,Sort完成的時候設定進度為33.3%;
之後啟動TaskReporter,用於更新當前狀態;
之後初始化任務,設定當前任務的狀態為RUNNING,設定輸出目錄等;
之後判斷任務是不是jobCleanup任務、jobSetup任務、taskCleanup任務,並做相應的處理;
之後判斷使用新舊哪套API,因為MapTask要相容兩套API;
確定以後呼叫runNewMapper方法,執行具體的map;
作業完成以後呼叫done方法,進行任務的清理、計數器的更新、任務狀態更新等;
hadoop2的話是使用runNewMapper()
1 @SuppressWarnings("unchecked") 2 private <INKEY,INVALUE,OUTKEY,OUTVALUE> 3 void runNewMapper(final JobConf job, 4 final TaskSplitIndex splitIndex, 5 final TaskUmbilicalProtocol umbilical, 6 TaskReporter reporter 7 ) throws IOException, ClassNotFoundException, 8 InterruptedException { 9 // make a task context so we can get the classes 10 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = 11 new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 12 getTaskID(), 13 reporter); 14 // make a mapper 15 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = 16 (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) 17 ReflectionUtils.newInstance(taskContext.getMapperClass(), job); 18 // make the input format 19 org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = 20 (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) 21 ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); 22 // rebuild the input split 23 org.apache.hadoop.mapreduce.InputSplit split = null; 24 split = getSplitDetails(new Path(splitIndex.getSplitLocation()), 25 splitIndex.getStartOffset()); 26 LOG.info("Processing split: " + split); 27 28 org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = 29 new NewTrackingRecordReader<INKEY,INVALUE> 30 (split, inputFormat, reporter, taskContext); 31 32 job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); 33 org.apache.hadoop.mapreduce.RecordWriter output = null; 34 35 // get an output object 36 if (job.getNumReduceTasks() == 0) { 37 output = 38 new NewDirectOutputCollector(taskContext, job, umbilical, reporter); 39 } else { 40 output = new NewOutputCollector(taskContext, job, umbilical, reporter); 41 } 42 43 org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 44 mapContext = 45 new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 46 input, output, 47 committer, 48 reporter, split); 49 50 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 51 mapperContext = 52 new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( 53 mapContext); 54 55 try { 56 input.initialize(split, mapperContext); 57 mapper.run(mapperContext); 58 mapPhase.complete(); 59 setPhase(TaskStatus.Phase.SORT); 60 statusUpdate(umbilical); 61 input.close(); 62 input = null; 63 output.close(mapperContext); 64 output = null; 65 } finally { 66 closeQuietly(input); 67 closeQuietly(output, mapperContext); 68 } 69 }
它的執行過程是:
1.獲取配置資訊類物件TaskAttemptContextImplement、自己開發的Mapper例項、使用者指定的InputFormat物件(預設是TextInputFormat)、任務對應的分片資訊split;
2.根據inputFormat構建一個NewTrackingRecordReader物件,這個物件中的RecordReader<K,V> real是LineRecordReader,用於讀取分片中的內容,傳遞給Mapper的map方法處理;
3.執行Mapper中的setup方法;
4.迴圈執行map方法;
5.執行cleanup方法;
6.最後是輸出流的關閉output.close(mapperContext),該方法會執行MapOutputBuffer.flush()操作,將剩餘資料也通過sortAndSpill()方法寫入本地檔案,並在最後呼叫mergeParts()方法合併所有的spill檔案。
關於spill,spill是map中比較重要的設計
spill過程包括輸出、排序、溢寫、合併等步驟;
每個Map任務不斷的以<key,value>對的形式把資料輸出到記憶體中構造一個環形的資料結構。這個資料結構其實是一個位元組陣列,叫kvbuffer,這裡面不僅有<Key,Value>資料,還有索引資料,並且給放置索引資料的區域起了一個kvmeta的別名
當這個緩衝區滿足一定條件後就會對緩衝區kvbuffer中的資料進行排序,先按分割槽編號partition進行升序,然後按照key進行升序。這樣快速排序後資料以分割槽為單位聚集在一起,且同一分割槽的所有資料按照key有序。然後通過sortAndSpill方法寫到本地檔案和索引檔案;如果有combiner,spill之前也會做一次聚集操作,等資料跑完通過歸併合併所有spill檔案和索引檔案。
Map階段的結果都會儲存在本地種(如果有reducer的話),非HDFS。