1. 程式人生 > 實用技巧 >【論文總結】MapReduce論文

【論文總結】MapReduce論文

摘要:

  1. MR是啥:程式設計模型,使用者只需編寫Map,Reduce兩個函式,系統完成分散式計算
  2. MR系統是啥:在大量普通計算機上實現並行化計算,系統只關心如何分割資料、大規模叢集的排程、叢集容錯、叢集通訊
  3. MR在Google的並行處理能力:上千臺機器上,處理TB級資料

介紹:

  • 問題:海量資料、資料分發、平行計算、容錯,開發、維護複雜,且不可複用
  • 核心:技術問題---》制約業務開發
  • 解決:封裝分散式處理的所有細節,提供統一的計算模型(MapReduce)
  • 為什麼是MapReduce:來源Lisp,函數語言程式設計的Map、Reduce原語,能解決Google絕大多數問題

實現:

執行過程

  1. 輸入檔案按照大小split成M份(16-64M),分發程式副本到各個worker上
  2. master控制任務流程,將M個Map任務與N個Reduce任務(使用者指定輸出桶數)分配給空閒的worker執行
  3. map任務讀取輸入split,將輸入key/value傳遞給Map函式,產生中間key/value,並快取在記憶體中、
  4. 快取中的key/value通過分割槽函式分成R個桶,並週期性寫到本地磁碟中。中間結果再磁碟中的位置會告知master,再由master告知reduce任務
  5. Reduce worker接收到master發來的儲存位置後,RPC讀取磁碟中的中間結果。不同map的中間結果雖然有序,但全域性無序,所以當所有中間結果讀取完後,會進行全域性排序(記憶體排序,不夠會外排)。
  6. Reduce worker讀取排序後的結果。對每一個key值,與中間value集合會呼叫一次reduce函式。reduce的輸出會被追加到所屬桶的輸出檔案中。
  7. 當所有map任務、reduce任務完成後,master喚醒使用者程式,呼叫返回。
  8. 若存在後續計算,可將當前輸出的N個檔案作為下一個MapReduce任務的輸入。

容錯

MapReduce通常在上千臺機器組成的叢集上執行,所以發生機器故障是常態

master故障

Master儲存了每一個Map和Reduce任務的狀態(空閒、工作中、完成),以及Worker機器數,以及其狀態

中間檔案的儲存位置通過Master從Map傳遞到Reduce。對於每個已完成的Map任務,Master儲存了Map任務產生的R箇中間檔案儲存的大小和位置

當Map任務完成時,Master接收到位置和大小更新資訊,這些資訊被逐步遞增的推送給那些正在工作的Reduce任務

  • Master週期性將上述資料結構寫入磁碟,即檢查點(checkpoint)。可以從最後一個檢查點開始啟動另外一個Master程序。Master失效再恢復是比較麻煩的,因此當前實現是,如果Master失效,就終止MapReduce運算。
worker故障
  • Master週期性ping每個worker。如果在一個約定時間沒收到worker返回的資訊,Master將這個worker標記為失效。所有由這個失效的worker完成的Map任務被重設為空閒。同樣,worker失效時正在執行的Map或Reduce任務也將被重置了空閒狀態,等待被重新排程
備用任務

影響一個MapReduce任務的總執行時間最通常的因素是“落伍者”:由於機器資源不均衡,可能某幾臺機器上的Map或Reduce任務花了很長時間才完成。比如磁碟老化,讀寫速度很慢,由比如此機器負載較高。

  • 當一個MapReduce操作解決完成的時候,Master排程備用的任務程序來執行剩下的處於處理中狀態的任務。最後無論最初的執行程序、還是備用的執行程序完成了任務,我們都標記為已完成。(部分實驗效率提升了44%)

技巧

  • M根據輸入拆分成餓了M個片段,Reduce拆分成R個片段執行。理想情況下M+R應當遠大於worker數目(有利於動態負載均衡),但事實上Master需要執行O(M+R)次排程,儲存O(M*R)個狀態,因此要考慮Master負載
  • 移動計算,本地計算或同一個交換機類通訊,儘量減少網路傳輸
  • 通過寫臨時檔案,rename臨時檔案避免讀到中間狀態的檔案資料
  • partition函式:通過hash(key) mod R 進行負載均衡。也可以使用hash(hostname(urlkey))來保證相同主機的key落到同一個分割槽
  • Combiner函式:可選,是否先進行本地合併,本質等同於Reduce函式,減少網路傳輸
  • 自定義輸入輸出型別:輸入輸出的型別,自定義資料來源,資料分割方式,輸出同理
  • 跳過損壞的記錄:MapReduce設定了訊號函式捕獲記憶體段異常和匯流排錯誤,通過全域性儲存記錄序號。如果特定記錄不止失敗一次,則master標記該記錄需要跳過,並在下次重新執行Map、Reduce任務時跳過
  • standalone本地除錯:本地順序執行MapReduce任務,方便除錯
  • 狀態資訊:監控各種執行狀態。已經完成多少任務、有多少任務在處理、輸入位元組數、輸出位元組數、處理百分比等等
  • 自定義計數器:統計不同事件發生的次數。比如想檢視當前已經處理了多少個單次,這些計數器的值通過worker到master的ping包中傳遞

總結

  1. MapReduce封裝了平行計算、容錯、資料本地化優化、負載均衡等技術難點細節
  2. 大量不同型別的問題都可以通過MapReduce解決(抽象的計算模型)
  3. 數千臺機器的大型叢集上執行MapReduce
  4. 約束式程式設計使得並行和分散式計算非常容易,也易於構建容錯的計算資源
  5. 網路頻寬是稀缺資源,需大量針對性優化,從本地磁碟讀取(移動計算)
  6. 多次執行相同任務來減少效能緩慢的機器帶來的負面影響

疑惑的解答:

  • 為什麼是Map、Reduce函式,能數學上證明所有問題能分解成Map、Reduce任務嗎?
  1. 這麼做只是起源於函數語言程式設計,又能解決Google內部的計算問題。
  2. 提供最簡單的抽象,能極大的簡化分散式系統的平行計算、容錯等設計
  • MapReduce為什麼具有劃時代的意義?(當時也就Oracle設計了最多 32臺機器的並行資料庫)

數千臺廉價機器的大規模平行計算,Google證明了這條路的可行性,設計了完整的容錯方案

  • MapReduce為什麼還需要我寫程式碼,提交jar包,這麼“難”使用?
  1. 當時只是Google為了解決一些很簡單的計算任務,並且主要時為了隱藏了分散式實現的細節,因此這個使用方式以及“滿足”了Google的場景
  2. 目前其實各種查詢引擎已經遮蔽了這些細節,無需再關注實際如何寫MapReduce
  • MapReduce執行流程為什麼是這樣的?為什麼必須按key排序?

因為當初Google的內部問題大部分需要排序,Doug Cutting就是看論文照著抄的

SQL轉MapReduce案例

Group By案例
select rank, isonline, count(*) from city group by rank, isonline;

Distinct案例
select dealid, count(distinct uid) num from order group by dealid;

JOIN案例
select u.name, o.orderid from order o join user u on o.uid = u.uid;

Hadoop MapReduce


參考資料:
https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html