1. 程式人生 > >hadoop核心執行原理

hadoop核心執行原理

我們通過下面這個天氣資料處理的例子來說明Hadoop的執行原理.

1Map-Reduce的邏輯過程

假設我們需要處理一批有關天氣的資料,其格式如下:

·        按照ASCII碼儲存,每行一條記錄

·        每一行字元從0開始計數,第15個到第18個字元為年

·        25個到第29個字元為溫度,其中第25位是符號+/-

0067011990999991950051507+0000+

0043011990999991950051512+0022+

0043011990999991950051518-0011+

0043012650999991949032412+0111+

0043012650999991949032418+0078

+

0067011990999991937051507+0001+

0043011990999991937051512-0002+

0043011990999991945051518+0001+

0043012650999991945032412+0002+

0043012650999991945032418+0078+

現在需要統計出每年的最高溫度。

Map-Reduce主要包括兩個步驟:MapReduce

每一步都有key-value對作為輸入和輸出:

·        map階段的key-value對的格式是由輸入的格式所決定的,如果是預設的TextInputFormat,則每行作為一個記錄程序處理,其中key為此行的開頭相對於檔案的起始位置,

value就是此行的字元文字

·        map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應

對於上面的例子,在map過程,輸入的key-value對如下:

(0, 0067011990999991950051507+0000+)

(33, 0043011990999991950051512+0022+)

(66, 0043011990999991950051518-0011+)

(99, 0043012650999991949032412+0111+)

(132, 0043012650999991949032418+0078+)

(165, 0067011990999991937051507+0001+)

(198, 0043011990999991937

051512-0002+)

(231, 0043011990999991945051518+0001+)

(264, 0043012650999991945032412+0002+)

(297, 0043012650999991945032418+0078+)

map過程中,通過對每一行字串的解析,得到年-溫度的key-value對作為輸出:

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)

reduce過程,將map過程中的輸出,按照相同的keyvalue放到同一個列表中作為reduce的輸入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])

reduce過程中,在列表中選擇出最大的溫度,將年-最大溫度的key-value作為輸出:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)

其邏輯過程可用如下圖表示:

下圖大概描述了Map-ReduceJob執行的基本原理:

下面我們討論JobConf,其有很多的項可以進行配置:

·        setInputFormat:設定map的輸入格式,預設為TextInputFormatkeyLongWritable,valueText

·        setNumMapTasks:設定map任務的個數,此設定通常不起作用,map任務的個數取決於輸入的資料所能分成的input split的個數

·        setMapperClass:設定Mapper,預設為IdentityMapper

·        setMapRunnerClass:設定MapRunner,map task是由MapRunner執行的,預設為MapRunnable,其功能為讀取input split的一個個record,依次呼叫Mappermap函式

·        setMapOutputKeyClasssetMapOutputValueClass:設定Mapper的輸出的key-value對的格式

·        setOutputKeyClasssetOutputValueClass:設定Reducer的輸出的key-value對的格式

·        setPartitionerClasssetNumReduceTasks:設定Partitioner,預設為HashPartitioner,其根據keyhash值來決定進入哪個partition,每個partition被一個reducetask處理,所以partition的個數等於reducetask的個數

·        setReducerClass:設定Reducer,預設為IdentityReducer

·        setOutputFormat:設定任務的輸出格式,預設為TextOutputFormat

·        FileInputFormat.addInputPath:設定輸入檔案的路徑,可以使一個檔案,一個路徑,一個萬用字元。可以被呼叫多次新增多個路徑

·        FileOutputFormat.setOutputPath:設定輸出檔案的路徑,在job執行前此路徑不應該存在

當然不用所有的都設定,由上面的例子,可以編寫Map-Reduce程式如下:

public class MaxTemperature {

    public static void main(String[] args) throws IOException {

        if (args.length != 2) {

            System.err.println("Usage: MaxTemperature <input path><output path>");

            System.exit(-1);

        }

        JobConf conf = new JobConf(MaxTemperature.class);

        conf.setJobName("Max temperature");

        FileInputFormat.addInputPath(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setMapperClass(MaxTemperatureMapper.class);

        conf.setReducerClass(MaxTemperatureReducer.class);

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

        JobClient.runJob(conf);

    }

}

3Map-Reduce資料流(data flow)

Map-Reduce的處理過程主要涉及以下四個部分:

·        客戶端Client:用於提交Map-reduce任務job

·        JobTracker:協調整個job的執行,其為一個Java程序,其mainclassJobTracker

·        TaskTracker:執行此jobtask,處理inputsplit,其為一個Java程序,其mainclassTaskTracker

·        HDFShadoop分散式檔案系統,用於在各個程序間共享Job相關的檔案

3.1、任務提交

JobClient.runJob()建立一個新的JobClient例項,呼叫其submitJob()函式。

·        JobTracker請求一個新的job ID

·        檢測此joboutput配置

·        計算此jobinput splits

·        Job執行所需的資源拷貝到JobTracker的檔案系統中的資料夾中,包括job jar檔案,job.xml配置檔案,input splits

·        通知JobTrackerJob已經可以運行了

提交任務後,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令列,直到任務執行完畢。

3.2、任務初始化

JobTracker收到submitJob呼叫的時候,將此任務放到一個佇列中,job排程器將從佇列中獲取任務並初始化任務。

初始化首先建立一個物件來封裝job執行的tasks,status以及progress

在建立task之前,job排程器首先從共享檔案系統中獲得JobClient計算出的inputsplits

其為每個inputsplit建立一個maptask

每個task被分配一個ID

3.3、任務分配

TaskTracker週期性的向JobTracker傳送heartbeat

heartbeat中,TaskTracker告知JobTracker其已經準備執行一個新的taskJobTracker將分配給其一個task

JobTrackerTaskTracker選擇一個task之前,JobTracker必須首先按照優先順序選擇一個Job,在最高優先順序的Job中選擇一個task

TaskTracker有固定數量的位置來執行map task或者reduce task

預設的排程器對待maptask優先於reducetask

當選擇reducetask的時候,JobTracker並不在多個task之間進行選擇,而是直接取下一個,因為reduce task沒有資料本地化的概念。

3.4、任務執行

TaskTracker被分配了一個task,下面便要執行此task

首先,TaskTracker將此jobjar從共享檔案系統中拷貝到TaskTracker的檔案系統中。

TaskTrackerdistributed cache中將job執行所需要的檔案拷貝到本地磁碟。

其次,其為每個task建立一個本地的工作目錄,將jar解壓縮到檔案目錄中。

其三,其建立一個TaskRunner來執行task

TaskRunner建立一個新的JVM來執行task

被建立的childJVMTaskTracker通訊來報告執行進度。

3.4.1Map的過程

MapRunnableinput split中讀取一個個的record,然後依次呼叫Mappermap函式,將結果輸出。

map的輸出並不是直接寫入硬碟,而是將其寫入快取memory buffer

buffer中資料的到達一定的大小,一個背景執行緒將資料開始寫入硬碟。

在寫入硬碟之前,記憶體中的資料通過partitioner分成多個partition

在同一個partition中,背景執行緒會將資料按照key在記憶體中排序。

每次從記憶體向硬碟flush資料,都生成一個新的spill檔案。

當此task結束之前,所有的spill檔案被合併為一個整的被partition的而且排好序的檔案。

reducer可以通過http協議請求map的輸出檔案,tracker.http.threads可以設定http服務執行緒數。

3.4.2Reduce的過程

map task結束後,其通知TaskTrackerTaskTracker通知JobTracker

對於一個jobJobTracker知道TaskTracermap輸出的對應關係。

reducer中一個執行緒週期性的向JobTracker請求map輸出的位置,直到其取得了所有的map輸出。

reduce task需要其對應的partition的所有的map輸出。

reduce task中的copy過程即當每個map task結束的時候就開始拷貝輸出,因為不同的map task完成時間不同。

reduce task中有多個copy執行緒,可以並行拷貝map輸出。

當很多map輸出拷貝到reducetask後,一個背景執行緒將其合併為一個大的排好序的檔案。

當所有的map輸出都拷貝到reducetask後,進入sort過程,將所有的map輸出合併為大的排好序的檔案。

最後進入reduce過程,呼叫reducerreduce函式,處理排好序的輸出的每個key,最後的結果寫入HDFS

3.5、任務結束

JobTracker獲得最後一個task的執行成功的報告後,將job得狀態改為成功。

JobClientJobTracker輪詢的時候,發現此job已經成功結束,則向用戶列印訊息,從runJob函式中返回。