MapReduce工作流程
阿新 • • 發佈:2021-08-08
MapReduce工作流程圖
流程詳解(重點)
MapTask
- 待處理的文字通過submit()方法,獲取待處理的資料資訊,然後根據InputFormat切片方法機制,生成切片方法檔案。把切片方法檔案和資源配置檔案全部提交在資源路徑。提交的資訊有:Job.split、wc.jar、Job.xml
- 把資源提交路徑下的資訊檔案提交給YARN叢集或者本地記憶體中,之後YARN叢集根據提交的資訊啟動Mr appmaster主機
- Mr appmaster根據提交的切片規劃機制計算出MapTask的數量,生成對應的MapTask任務。
- MapTask任務根據Mr appmaster分配的資料量,去原始檔案當中讀取相應切片的資料。讀取資料需要藉助InputFormat中定義的 RecordReader(如果沒有定義,則會預設使用TextInputFormat)進行切片
- 如果使用預設切片方法,則每讀取一行,在Mapper類中進行邏輯運算,即通過獲取到的key-value值確定檔案傳到Reduce類中的key-value值
- 邏輯運算完成後,通過Context.write(k,v)方法進行資料傳輸。此時先將結果(<k,v>資料)寫入到一個環形緩衝區,在這個環形緩衝區中一分為二,從右側的緩衝區開始寫入資料(<k,v>鍵值對),從左側開始寫入資料的索引。
- 在往環形緩衝區寫入資料的過程中,不僅要寫入資料的索引,還需要寫出資料所在分割槽(MapReduce中分割槽預設有一個,也可以指定多個分割槽)。在聲明瞭所在的分割槽之後,要對<k,v>鍵值對進行排序。排序不是每時每刻進行的,是在資料寫入完成,或者資料往磁碟溢寫的時候,要進行一次排序
- 當記憶體已存入80%時,將環形緩衝區中的資料溢寫到磁碟中,並將緩衝區中的資料清空,之後在反向寫入資料和索引(順時針改為逆時針/逆時針改為順時針)。如果資料過多,會進行多次溢寫,溢寫時根據分割槽溢寫,並且每個分割槽內的資料是有序的
- 對溢寫檔案通過Merge進行歸併排序,之後通過Combiner對資料進行合併
ReduceTask
- MapTask資料處理完成之後,將資料寫入緩衝區分割槽或者磁碟中去,磁碟中儲存的檔案資訊與緩衝區儲存的檔案資訊相同,都有<k,v>鍵值對及其分割槽和索引。
- 在所有的MapTask任務完成後,Mr appmaster做出響應。Mr appmaster根據MapTask輸入的分割槽數確定ReduceTask的數量
- 之後Mr appmaster啟動相應數量的ReduceTask,並告知ReduceTask處理資料的範圍(資料分割槽),一個分割槽需要有一個ReduceTask處理資料。ReduceTask1處理partition 1的資料,ReduceTask2處理partition2的資料……
- ReduceTask將MapTask中相應分割槽中的資料下載到ReduceTask本地磁碟中,並將檔案進行合併,之後再對檔案中的資料進行歸併排序
- 通過GroupingComparator(k,knext)方法對檔案進行分組,之後將key值相同的資料呼叫Reduce(k,v)方法,一次讀取一組
- 通過OutPutFormat將資料輸入到結果檔案中(預設呼叫TextOutPutFormat)。如果有多個ReduceTask,則寫入到多個輸出檔案中
Shuffle
- maptask收集我們的map()方法輸出的kv對,放到記憶體緩衝區中
- 從記憶體緩衝區不斷溢位本地磁碟檔案,可能會溢位多個檔案
- 多個溢位檔案會被合併成大的溢位檔案
- 在溢位過程中及合併的過程中,都要呼叫partitioner進行分割槽和針對key進行排序
- reducetask根據自己的分割槽號,去各個maptask機器上取相應的結果分割槽資料
- reducetask會取到同一個分割槽的來自不同maptask的結果檔案,reducetask會將這些檔案再進行合併(歸併排序)
- 合併成大檔案後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從檔案中取出一個一個的鍵值對group,呼叫使用者自定義的reduce()方法)
總結
- MR流程總共分為四個階段:
- submit階段:切片job.split和配置項資訊job.xml形成檔案提交到一個資源路徑,然後通過YARN啟動執行
- MapTask任務處理階段:讀取切片資料、處理切片資料
- Shuffle階段(MapTask任務執行結束到ReduceTask任務執行之前):MapTask寫出資料到環形緩衝區、分割槽、排序、溢寫檔案……
- ReduceTask任務處理階段:讀取環形緩衝區資料、讀取溢寫檔案資料、reduce運算邏輯、輸出結果到輸出檔案
- 注意:
Shuffle中的緩衝區大小會影響到mapreduce程式的執行效率,原則上說,緩衝區越大,磁碟io的次數越少,執行速度就越快。
緩衝區的大小可以通過引數調整,引數:io.sort.mb 預設100M