1. 程式人生 > 其它 >MapReduce工作流程

MapReduce工作流程

MapReduce工作流程圖

流程詳解(重點)

MapTask

  1. 待處理的文字通過submit()方法,獲取待處理的資料資訊,然後根據InputFormat切片方法機制,生成切片方法檔案。把切片方法檔案和資源配置檔案全部提交在資源路徑。提交的資訊有:Job.split、wc.jar、Job.xml
  2. 把資源提交路徑下的資訊檔案提交給YARN叢集或者本地記憶體中,之後YARN叢集根據提交的資訊啟動Mr appmaster主機
  3. Mr appmaster根據提交的切片規劃機制計算出MapTask的數量,生成對應的MapTask任務。
  4. MapTask任務根據Mr appmaster分配的資料量,去原始檔案當中讀取相應切片的資料。讀取資料需要藉助InputFormat中定義的 RecordReader(如果沒有定義,則會預設使用TextInputFormat)進行切片
  5. 如果使用預設切片方法,則每讀取一行,在Mapper類中進行邏輯運算,即通過獲取到的key-value值確定檔案傳到Reduce類中的key-value值
  6. 邏輯運算完成後,通過Context.write(k,v)方法進行資料傳輸。此時先將結果(<k,v>資料)寫入到一個環形緩衝區,在這個環形緩衝區中一分為二,從右側的緩衝區開始寫入資料(<k,v>鍵值對),從左側開始寫入資料的索引。
  7. 在往環形緩衝區寫入資料的過程中,不僅要寫入資料的索引,還需要寫出資料所在分割槽(MapReduce中分割槽預設有一個,也可以指定多個分割槽)。在聲明瞭所在的分割槽之後,要對<k,v>鍵值對進行排序。排序不是每時每刻進行的,是在資料寫入完成,或者資料往磁碟溢寫的時候,要進行一次排序
  8. 當記憶體已存入80%時,將環形緩衝區中的資料溢寫到磁碟中,並將緩衝區中的資料清空,之後在反向寫入資料和索引(順時針改為逆時針/逆時針改為順時針)。如果資料過多,會進行多次溢寫,溢寫時根據分割槽溢寫,並且每個分割槽內的資料是有序的
  9. 對溢寫檔案通過Merge進行歸併排序,之後通過Combiner對資料進行合併

ReduceTask

  1. MapTask資料處理完成之後,將資料寫入緩衝區分割槽或者磁碟中去,磁碟中儲存的檔案資訊與緩衝區儲存的檔案資訊相同,都有<k,v>鍵值對及其分割槽和索引。
  2. 在所有的MapTask任務完成後,Mr appmaster做出響應。Mr appmaster根據MapTask輸入的分割槽數確定ReduceTask的數量
  3. 之後Mr appmaster啟動相應數量的ReduceTask,並告知ReduceTask處理資料的範圍(資料分割槽),一個分割槽需要有一個ReduceTask處理資料。ReduceTask1處理partition 1的資料,ReduceTask2處理partition2的資料……
  4. ReduceTask將MapTask中相應分割槽中的資料下載到ReduceTask本地磁碟中,並將檔案進行合併,之後再對檔案中的資料進行歸併排序
  5. 通過GroupingComparator(k,knext)方法對檔案進行分組,之後將key值相同的資料呼叫Reduce(k,v)方法,一次讀取一組
  6. 通過OutPutFormat將資料輸入到結果檔案中(預設呼叫TextOutPutFormat)。如果有多個ReduceTask,則寫入到多個輸出檔案中

Shuffle

  1. maptask收集我們的map()方法輸出的kv對,放到記憶體緩衝區中
  2. 從記憶體緩衝區不斷溢位本地磁碟檔案,可能會溢位多個檔案
  3. 多個溢位檔案會被合併成大的溢位檔案
  4. 在溢位過程中及合併的過程中,都要呼叫partitioner進行分割槽和針對key進行排序
  5. reducetask根據自己的分割槽號,去各個maptask機器上取相應的結果分割槽資料
  6. reducetask會取到同一個分割槽的來自不同maptask的結果檔案,reducetask會將這些檔案再進行合併(歸併排序)
  7. 合併成大檔案後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從檔案中取出一個一個的鍵值對group,呼叫使用者自定義的reduce()方法)

總結

  • MR流程總共分為四個階段:
  1. submit階段:切片job.split和配置項資訊job.xml形成檔案提交到一個資源路徑,然後通過YARN啟動執行
  2. MapTask任務處理階段:讀取切片資料、處理切片資料
  3. Shuffle階段(MapTask任務執行結束到ReduceTask任務執行之前):MapTask寫出資料到環形緩衝區、分割槽、排序、溢寫檔案……
  4. ReduceTask任務處理階段:讀取環形緩衝區資料、讀取溢寫檔案資料、reduce運算邏輯、輸出結果到輸出檔案
  • 注意:

  Shuffle中的緩衝區大小會影響到mapreduce程式的執行效率,原則上說,緩衝區越大,磁碟io的次數越少,執行速度就越快。

   緩衝區的大小可以通過引數調整,引數:io.sort.mb 預設100M