1. 程式人生 > 實用技巧 >MapReduce的執行流程概述

MapReduce的執行流程概述

MapReduce處理資料的大致流程

InputFormat呼叫RecordReader,從輸入目錄的檔案中,讀取一組資料,封裝為keyin-valuein物件

②將封裝好的key-value,交給Mapper.map()------>將處理的結果寫出 keyout-valueout

ReduceTask啟動Reducer,使用Reducer.reduce()處理Mapper的keyout-valueout

④OutPutFormat呼叫RecordWriter,將Reducer處理後的keyout-valueout寫出到檔案

關於這些名詞的解釋參考我之前的文章MapReduce計算框架的核心程式設計思想

示例

需求: 統計/hello目錄中每個檔案的單詞數量,

a-p開頭的單詞放入到一個結果檔案中,

q-z開頭的單詞放入到另外一個結果檔案中。

例如:

/hello/a.txt,檔案大小200M

hello,hi,hadoop

hive,hadoop,hive,

zoo,spark,wow

zoo,spark,wow

...

/hello/b.txt,檔案大小100M

hello,hi,hadoop

zoo,spark,wow

...

1. Map階段(執行MapTask,將一個大的任務切分為若干小任務,處理輸出階段性的結果)

①切片(切分資料)

/hello/a.txt 200M

/hello/b.txt 100M

預設的切分策略是以檔案為單位,以檔案的塊大小(128M)為片大小進行切片!

split0:/hello/a.txt,0-128M

split1: /hello/a.txt,128M-200M

split2: /hello/b.txt,0M-100M

②執行MapTask(程式),每個MapTask負責一片資料

split0:/hello/a.txt,0-128M--------MapTask1

split1: /hello/a.txt,128M-200M--------MapTask2

split2: /hello/b.txt,0M-100M--------MapTask3

③讀取資料階段

在MR中,所有的資料必須封裝為key-value

MapTask1,2,3都會初始化一個InputFormat(預設TextInputFormat),每個InputFormat物件負責建立一個RecordReader(LineRecordReader)物件,


RecordReader負責從每個切片的資料中讀取資料,封裝為key-value

LineRecordReader: 將檔案中的每一行封裝為一個key(offset)-value(當前行的內容)

舉例:

hello,hi,hadoop----->(0,hello,hi,hadoop)

hive,hadoop,hive----->(20,hive,hadoop,hive)

zoo,spark,wow----->(30,zoo,spark,wow)

zoo,spark,wow----->(40,zoo,spark,wow)

④進入Mapper的map()階段

map()是Map階段的核心處理邏輯! 單詞統計! map()會迴圈呼叫,對輸入的每個Key-value都進行處理!

輸入:(0,hello,hi,hadoop)

輸出:(hello,1),(hi,1),(hadoop,1)

輸入:(20,hive,hadoop,hive)

輸出:(hive,1),(hadoop,1),(hive,1)

輸入:(30,zoo,spark,wow)

輸出:(zoo,1),(spark,1),(wow,1)

輸入:(40,zoo,spark,wow)

輸出:(zoo,1),(spark,1),(wow,1)

⑤將MapTask輸出的記錄進行分割槽(分組、分類)

在Mapper輸出後,呼叫Partitioner,對Mapper輸出的key-value進行分割槽,分割槽後也會排序(預設字典順序排序)

分割槽規則:

  • a-p開頭的單詞放入到一個區
  • q-z開頭的單詞放入到另一個區

    MapTask1:

    0號區: (hadoop,1),(hadoop,1),(hello,1),(hi,1),(hive,1),(hive,1)

    1號區: (spark,1),(spark,1),(wow,1) ,(wow,1),(zoo,1)(zoo,1)

MapTask2:

0號區: ...

1號區: ...

MapTask3:

0號區: (hadoop,1),(hello,1),(hi,1),

1號區: (spark,1),(wow,1),(zoo,1)

2.Reduce階段

①因為需求是生成兩個結果檔案,所以我們需要啟動兩個ReduceTask

ReduceTask啟動後,會啟動shuffle執行緒,從MapTask中拷貝相應分割槽的資料!

ReduceTask1: 只負責0號區

將三個MapTask,生成的0號區資料全部拷貝到ReduceTask所在的機器!

(hadoop,1),(hadoop,1),(hello,1),(hi,1),(hive,1),(hive,1)

(hadoop,1),(hello,1),(hi,1),

ReduceTask2: 只負責1號區

將三個MapTask,生成的1號區資料全部拷貝到ReduceTask所在的機器!

(spark,1),(spark,1),(wow,1) ,(wow,1),(zoo,1)(zoo,1)

(spark,1),(wow,1),(zoo,1)

②sort

ReduceTask1: 只負責0號區進行排序:

(hadoop,1),(hadoop,1),(hadoop,1),(hello,1),(hello,1),(hi,1),(hi,1),(hive,1),(hive,1)

ReduceTask2: 只負責1號區進行排序:

(spark,1),(spark,1),(spark,1),(wow,1) ,(wow,1),(wow,1),(zoo,1),(zoo,1)(zoo,1)

③reduce

ReduceTask1---->Reducer----->reduce(一次讀入一組資料)

何為一組資料: key相同的為一組資料

輸入: (hadoop,1),(hadoop,1),(hadoop,1)

輸出: (hadoop,3)

輸入: (hello,1),(hello,1)

輸出: (hello,2)

輸入: (hi,1),(hi,1)

輸出: (hi,2)

輸入:(hive,1),(hive,1)

輸出: (hive,2)

ReduceTask2---->Reducer----->reduce(一次讀入一組資料)

輸入: (spark,1),(spark,1),(spark,1)

輸出: (spark,3)

輸入: (wow,1) ,(wow,1),(wow,1)

輸出: (wow,3)

輸入:(zoo,1),(zoo,1)(zoo,1)

輸出: (zoo,3)

④呼叫OutPutFormat中的RecordWriter將Reducer輸出的記錄寫出

ReduceTask1---->OutPutFormat(預設TextOutPutFormat)---->RecordWriter(LineRecoreWriter)

LineRecoreWriter將一個key-value以一行寫出,key和alue之間使用\t分割

在輸出目錄中,生成檔案part-r-0000

hadoop 3

hello 2

hi 2

hive 2

ReduceTask2---->OutPutFormat(預設TextOutPutFormat)------>RecordWriter(LineRecoreWriter)

LineRecoreWriter將一個key-value以一行寫出,key和alue之間使用\t分割

在輸出目錄中,生成檔案part-r-0001

spark 3

wow 3

zoo 3

三、MR總結

Map階段(MapTask): 切片(Split)-----讀取資料(Read)-------交給Mapper處理(Map)------分割槽和排序(sort)

Reduce階段(ReduceTask): 拷貝資料(copy)------排序(sort)-----合併(reduce)-----寫出(write)