1. 程式人生 > 其它 >【從零開始分散式】1. MapReduce

【從零開始分散式】1. MapReduce

背景

MapReduce 的提出和 Google 其本身的業務特點有很強的關聯。Google 以搜尋引擎和廣告業務為核心,有很多諸如爬蟲處理、網頁請求日誌、倒排索引等特定的大量資料的大規模計算業務,儘管這些任務的實現邏輯大多都是非常簡單直接的,但由於資料規模的龐大,往往需要在成百上千臺不同的機器上進行並行資料存取和計算,才能在合理的時間內得到結果。

為了簡化分散式程式設計的難度,Google 基於以 Lisp 為代表的函數語言程式設計語言和向量程式語言思想,提出了 MapReduce 框架,將原本複雜的分散式程式設計任務通過 MapReduce 框架進行抽象簡化,隱去了並行化、故障容錯、資料分配、負載均衡等步驟,使得程式設計師僅需要根據具體任務設計 Map 和 Reduce 這兩個函式,從而,使得分散式任務的開發不再依賴於分散式系統的專家,極大地簡化了分散式任務開發的難度。

模型框架及經典任務

Map 函式:在輸入的 <K, V> 對上應用此函式,得到一箇中間過程的 <K, V> 對集合,並且把相同 key 值對應的不同 value 打包,傳遞給 Reduce 函式。相當於可以把這個 <K, V> 的 map 看成是一個 multiset,每次給 Reduce 函式傳遞一個 key 對應的 set 。

Reduce 函式:接收 Map 計算後的中間過程結果,合併中間資料,得到最終的結果。

任務 具體描述 Map Reduce
單詞計數 統計一系列大文件中的每個單詞的出現次數。 輸出單個文件切片中每個單詞的出現次數。 把 Map 產生的每一個特定的單詞的出現數累加。
分散式 Grep Grep ( Global Regular Expression Print ) 使用正則表示式搜尋文字,並把匹配的行打印出來。 輸出匹配某個正則表示式的一行。 把 Map 產生的中間資料複製到輸出,相當於一個恆等函式。
統計 URL 訪問頻率 類似於單詞計數,統計一系列處理日誌中某個網頁的訪問請求頻率。 處理網路日誌輸入,對每個訪問輸出 <URL, 1>。 把 Map 產生的每個相同 URL 的值都統計在一起,並輸出 <URL, total count>。
翻轉網頁連結圖 給定一個有向圖,其中的每條邊 <u, v> 表示網站 u 可以連結到網頁 v。對每一個網頁 v,輸出可以連結到該 v 的所有網站 u。 對於每個輸入的 <source, target>,翻轉輸出 <target, source>。 把 Map 產生的所有的 target 相同的 source 列表合併,輸出 <target, list(source)>。
主機術語向量 術語向量是一個或多個文件中最重要的詞語組成的 <word, frequency> 對。 對於每個輸入文件,提取 hostname, 並生成 <hostname, term vector>。 把 Map 產生的指定主機的所有文件的術語向量加在一起,並丟棄低頻詞,生成最終的 <hostname, term vector>。
倒排索引 搜尋引擎需要將形如 key→value 的正向索引重構為 value→key 的倒排索引。 處理每個文件,生成 <word, document ID>。 接收所有的 key-value 對,並且給定的 word,對其所有的 document ID 列表進行排序,輸出 <document, list(document ID>。
分散式排序 即根據給定的規則進行大資料排序。 從每個記錄中提取 key 值,並生成 <key, record>。 接受所有的 key-value 對,不修改其內容,把相同 key 的 pairs 給同一個 reduce 進行處理。

Hadoop 對於原始輸入到 Map 的處理預設按照 WordCount 任務進行,即給定大文字,Hadoop會將文字切片,然後按照 <片內偏移,文字> 的形式送到 Map 函式。如需修改可以重設引數,或者重寫對輸入的處理。

實現

這裡只討論論文中提到的實現。比較重要的背景有:

  1. 網路頻寬較低,需要儘量減少網路傳輸 ===> 考慮讓處理任務的工作機和儲存原始資料的工作機儘量是同一個,或者距離比較近
  2. 叢集龐大,所以機器故障很常見 ===> 需要容錯處理
  3. 儲存設施便宜,因此管理儲存資料的內部分散式檔案系統通過複製拷貝的方法來提高系統的可靠性 ===> 也是之後系統中 master 機故障的處理方法之一
  4. 使用者將工作提交到排程系統,每個工作都有一系列任務,由排程系統安排可用的機器來執行 ===> 整個框架的基礎

下面來看一下 Google 對於整個系統流程的設計:

Figure 1. Execution Overview

整個系統流程也是相對比較簡單直接的。使用者通過介面將工作提交到叢集,叢集中的 Master 機會將該工作的輸入分配到 M 個空閒的工作機執行對應的 Map 任務。執行 Map 任務的工作機(後簡稱 Map 機)會從內建的分散式檔案系統中讀取對應的輸入切片,解析出 key-value 對後送到使用者自定義的 Map 函式中進行進一步的計算處理,將產生的中間結果會被劃分成 R 塊快取在該工作機本地,並通知 Master 機這些資料在 Map 機上的對應位置、大小等資訊,Master 機再挑選 R 個空閒的工作機去執行後續的 Reduce 任務。每個執行 Reduce 任務的工作機 (後簡稱 Reduce 機)會遠端讀取這些中間結果,並且在讀取完所有分配給自己處理的中間結果之後,Reduce 機會將這些資料按照其 key 值排序打包(通常需要外排序)。這個處理完的結果就可以直接被使用者定義的 Reduce 函式使用,計算出最終的結果。雖然這個時候所有的結果仍然分佈在叢集的各個工作機之中,但由於大多數分散式計算出的結果往往會成為下一個分散式計算的輸入,所以把檔案合併的操作往往是不必要的。因此,我們只需要 Master 機去喚醒一下之前沉睡的使用者程序,並將這些檔案(或檔案地址)返回給使用者即可。

容錯處理

  • 工作機: Master 機每隔一段時間就會 ping 一下每個工作機。如果工作機在一段時間內都沒有響應,就會認為工作機故障了。由於只有 Reduce 機的最終計算結果會儲存在全域性的檔案系統中,其他計算任務和中間過程結果都是存在本地的,因此,只有完成了 Reduce 任務的工作機發生的故障可以忽略,其他故障一旦發生,該工作機被分配的任務必須被重新執行,並將該工作機重置為空閒狀態。需要注意的是,如果一個 Map 任務先被分配到了工作機 A,後來由於發生故障,由工作機 B 重新執行了 Map 任務的話,所有尚未從工作機 A 讀取到資料的 Reduce 機都必須從工作機 B 中讀取資料(已經完成讀取的可以忽略)。也因此,MapReduce 可以適應大規模的工作機故障,因為 Master 機會不斷地重新執行那些暫時不能訪問的工作機上正在執行的任務來推進整個工作的進度,直到整個工作被完成。
  • Master 機:由於 Master 機是單機,因此論文作這認為 Master 機宕機的概率比較小,給出的一個處理方法是增加多個檢查點,當 master 掛了的時候,從上一個檢查點恢復。所以一旦 Master 機徹底掛了的話,整個 MapReduce 計算也會終止。如果想避免這個情況,或許可以考慮由其他 client 機來檢查這個 Master 機的狀態,並且在 Master 機掛了的時候嘗試恢復或重新執行 MapReduce。