1. 程式人生 > 實用技巧 >MapReduce 模式、演算法和用例(一)

MapReduce 模式、演算法和用例(一)

摘自:http://my.oschina.net/juliashine/blog/86063?from=20121111

本文譯自Mapreduce Patterns, Algorithms, and Use Cases


在這篇文章裡總結了幾種網上或者論文中常見的MapReduce模式和演算法,並系統化的解釋了這些技術的不同之處。所有描述性的文字和程式碼都使用了標準hadoop的MapReduce模型,包括Mappers, Reduces, Combiners, Partitioners,和 sorting。如下圖所示。

基本MapReduce模式

計數與求和

問題陳述:

有許多文件,每個文件都有一些欄位組成。需要計算出每個欄位在所有文件中的出現次數或者這些欄位的其他什麼統計值。例如,給定一個log檔案,其中的每條記錄都包含一個響應時間,需要計算出平均響應時間。

解決方案:

讓我們先從簡單的例子入手。在下面的程式碼片段裡,Mapper每遇到指定詞就把頻次記1,Reducer一個個遍歷這些詞的集合然後把他們的頻次加和。

 1 class Mapper  2  method Map(docid id, doc d)  3  for all term t in doc d do  4  Emit(term t, count 1)  5   6 class Reducer  7  method Reduce(term t, counts [c1, c2,...])  8  sum = 0  9  for all count c in [c1, c2,...] 
do 10 sum = sum + c 11 Emit(term t, count sum)

這種方法的缺點顯而易見,Mapper提交了太多無意義的計數。它完全可以通過先對每個文件中的詞進行計數從而減少傳遞給Reducer的資料量:

1 class Mapper 2  method Map(docid id, doc d) 3  H = new AssociativeArray 4  for all term t in doc d do 5  H{t} = H{t} + 1 6  for all term t in H do 7  Emit(term t, count H{t})

如果要累計計數的的不只是單個文件中的內容,還包括了一個Mapper節點處理的所有文件,那就要用到Combiner了:

 1  class Mapper  2  method Map(docid id, doc d)  3  for all term t in doc d do  4  Emit(term t, count 1)  5   6  class Combiner  7  method Combine(term t, [c1, c2,...])  8  sum = 0  9  for all count c in [c1, c2,...] do 10  sum = sum + c 11  Emit(term t, count sum) 12  13  class Reducer 14  method Reduce(term t, counts [c1, c2,...]) 15  sum = 0 16  for all count c in [c1, c2,...] do 17  sum = sum + c 18  Emit(term t, count sum)

應用:Log 分析, 資料查詢

整理歸類

問題陳述:

有一系列條目,每個條目都有幾個屬性,要把具有同一屬性值的條目都儲存在一個檔案裡,或者把條目按照屬性值分組。 最典型的應用是倒排索引。

解決方案:

解決方案很簡單。 在 Mapper 中以每個條目的所需屬性值作為 key,其本身作為值傳遞給 Reducer。 Reducer 取得按照屬性值分組的條目,然後可以處理或者儲存。如果是在構建倒排索引,那麼 每個條目相當於一個詞而屬性值就是詞所在的文件ID。

應用:倒排索引, ETL

過濾 (文字查詢),解析和校驗

問題陳述:

假設有很多條記錄,需要從其中找出滿足某個條件的所有記錄,或者將每條記錄傳換成另外一種形式(轉換操作相對於各條記錄獨立,即對一條記錄的操作與其他記錄無關)。像文字解析、特定值抽取、格式轉換等都屬於後一種用例。

解決方案:

非常簡單,在Mapper 裡逐條進行操作,輸出需要的值或轉換後的形式。

應用:日誌分析,資料查詢,ETL,資料校驗

分散式任務執行

問題陳述:

大型計算可以分解為多個部分分別進行然後合併各個計算的結果以獲得最終結果。

解決方案: 將資料切分成多份作為每個 Mapper 的輸入,每個Mapper處理一份資料,執行同樣的運算,產生結果,Reducer把多個Mapper的結果組合成一個。

案例研究: 數字通訊系統模擬

像 WiMAX 這樣的數字通訊模擬軟體通過系統模型來傳輸大量的隨機資料,然後計算傳輸中的錯誤機率。 每個 Mapper 處理樣本 1/N 的資料,計算出這部分資料的錯誤率,然後在 Reducer 裡計算平均錯誤率。

應用:工程模擬,數字分析,效能測試

排序

問題陳述:

有許多條記錄,需要按照某種規則將所有記錄排序或是按照順序來處理記錄。

解決方案:簡單排序很好辦 – Mappers 將待排序的屬性值為鍵,整條記錄為值輸出。 不過實際應用中的排序要更加巧妙一點, 這就是它之所以被稱為MapReduce 核心的原因(“核心”是說排序?因為證明Hadoop計算能力的實驗是大資料排序?還是說Hadoop的處理過程中對key排序的環節?)。在實踐中,常用組合鍵來實現二次排序和分組。

MapReduce 最初只能夠對鍵排序, 但是也有技術利用可以利用Hadoop 的特性來實現按值排序。想了解的話可以看這篇部落格

按照BigTable的概念,使用 MapReduce來對最初資料而非中間資料排序,也即保持資料的有序狀態更有好處,必須注意這一點。換句話說,在資料插入時排序一次要比在每次查詢資料的時候排序更高效。

應用:ETL,資料分析

非基本 MapReduce 模式

迭代訊息傳遞 (圖處理)

問題陳述:

假設一個實體網路,實體之間存在著關係。 需要按照與它比鄰的其他實體的屬性計算出一個狀態。這個狀態可以表現為它和其它節點之間的距離, 存在特定屬性的鄰接點的跡象, 鄰域密度特徵等等。

解決方案:

網路儲存為系列節點的結合,每個節點包含有其所有鄰接點ID的列表。按照這個概念,MapReduce 迭代進行,每次迭代中每個節點都發訊息給它的鄰接點。鄰接點根據接收到的資訊更新自己的狀態。當滿足了某些條件的時候迭代停止,如達到了最大迭代次數(網路半徑)或兩次連續的迭代幾乎沒有狀態改變。從技術上來看,Mapper 以每個鄰接點的ID為鍵發出資訊,所有的資訊都會按照接受節點分組,reducer 就能夠重算各節點的狀態然後更新那些狀態改變了的節點。下面展示了這個演算法:

 1 class Mapper  2  method Map(id n, object N)  3  Emit(id n, object N)  4  for all id m in N.OutgoingRelations do  5  Emit(id m, message getMessage(N))  6   7 class Reducer  8  method Reduce(id m, [s1, s2,...])  9  M = null 10  messages = [] 11  for all s in [s1, s2,...] do 12  if IsObject(s) then 13  M = s 14  else // s is a message 15  messages.add(s) 16  M.State = calculateState(messages) 17  Emit(id m, item M)

一個節點的狀態可以迅速的沿著網路傳全網,那些被感染了的節點又去感染它們的鄰居,整個過程就像下面的圖示一樣: