hadoop mapTask執行過程
阿新 • • 發佈:2019-01-04
1、執行run方法-->runJobSetupTask-->runNewMapper-->input.initialize(split,
mapperContext)-->mapper.run(mapperContext)
2、重複呼叫map()方法,執行context.write(key,value)-->TaskInputOutputContext中的write()-->TaskInputOutputContextImpl中的write()-->NewOutputCollector中的write()方法,write()方法呼叫collector.collect()-->MapOutputCollector中的collect()-->MapOutputBuffer(MapOutputCollector的實現類)中的collect()。
3、MapOutputBuffer的collect方法中把key和value序列化後儲存在一個環形快取中,如果快取滿了則會呼叫startspill方法
設定訊號量,使得一個獨立的執行緒SpillThread可以對快取中的資料進行處理。
4、SpillThread執行緒的run方法中呼叫sortAndSpill方法對快取中的資料進行排序後寫溢位檔案。
5、當map輸出完成後,會呼叫output的close方法。
6、在close方法中需要對緩衝區做一些最後的清理,呼叫MapOutputBuffer類中flush方法,合併spill{n}檔案產生最後的輸出。先等待可能的spill過程完成,然後判斷緩衝區是否為空,如果不是,則呼叫sortAndSpill,做最後的spill,然後結束spill執行緒,然後清空kvbuffer,最後呼叫mergeParts()。
--->sortAndSpill():先對鍵值進行排序,如果沒有combinerRunner直接溢寫,否則先進行combine然後再溢寫。
--->mergeParts():merge是將多個溢寫檔案合併到一個檔案,所以可能也有相同的key存在,在這個過程中如果配置設定過Combiner,也會使用Combiner來合併相同的key。?mapreduce讓每個map只輸出一個檔案,並且為這個檔案提供一個索引檔案,以記錄每個reduce對應資料的偏移量。
2、重複呼叫map()方法,執行context.write(key,value)-->TaskInputOutputContext中的write()-->TaskInputOutputContextImpl中的write()-->NewOutputCollector中的write()方法,write()方法呼叫collector.collect()-->MapOutputCollector中的collect()-->MapOutputBuffer(MapOutputCollector的實現類)中的collect()。
3、MapOutputBuffer的collect方法中把key和value序列化後儲存在一個環形快取中,如果快取滿了則會呼叫startspill方法
設定訊號量,使得一個獨立的執行緒SpillThread可以對快取中的資料進行處理。
4、SpillThread執行緒的run方法中呼叫sortAndSpill方法對快取中的資料進行排序後寫溢位檔案。
5、當map輸出完成後,會呼叫output的close方法。
6、在close方法中需要對緩衝區做一些最後的清理,呼叫MapOutputBuffer類中flush方法,合併spill{n}檔案產生最後的輸出。先等待可能的spill過程完成,然後判斷緩衝區是否為空,如果不是,則呼叫sortAndSpill,做最後的spill,然後結束spill執行緒,然後清空kvbuffer,最後呼叫mergeParts()。
--->sortAndSpill():先對鍵值進行排序,如果沒有combinerRunner直接溢寫,否則先進行combine然後再溢寫。
--->mergeParts():merge是將多個溢寫檔案合併到一個檔案,所以可能也有相同的key存在,在這個過程中如果配置設定過Combiner,也會使用Combiner來合併相同的key。?mapreduce讓每個map只輸出一個檔案,並且為這個檔案提供一個索引檔案,以記錄每個reduce對應資料的偏移量。