1. 程式人生 > >《MapReduce: Simplified Data Processing on Large Clusters》論文研讀

《MapReduce: Simplified Data Processing on Large Clusters》論文研讀

# MapReduce 論文研讀 **說明**:本文為論文 **《MapReduce: Simplified Data Processing on Large Clusters》** 的個人理解,難免有理解不到位之處,歡迎交流與指正 。 **論文地址**:[MapReduce Paper](https://github.com/XutongLi/Learning-Notes/blob/master/Distributed_System/Paper_Reading/MapReduce/mapreduce.pdf) *** ## 1. MapReduce 程式設計模型 **MapReduce** 是 Google 提出的一種用於處理和生成大資料集的 **程式設計模型** ,具象地可以理解成一個 **框架** 。 該框架含有兩個由使用者來實現的介面:`map` 和 `reduce` ,`map` 函式接收一個鍵值對,生成一箇中間鍵值對集合,*MapReduce* 框架會將所有共用一個鍵的值組合在一起並傳遞給 `reduce` 函式,`reduce` 函式接收此中間鍵以及該鍵的值的集合,將這些值合併在一起,生成一組更小的值的集合 。 該程式設計模型中,資料形式變換可由以下模式表示: ``` map: (k1, v1) -> list(k2, v2) reduce: (k2, list(v2)) -> list(v3) ``` **注**:論文中該模式第二行表示為 `reduce: (k2, list(v2)) -> list(v2)` ,個人認為由於通常情況下 `reduce` 會對 `list` 做一些處理(特殊情況下不做任何處理,即 *reduce* 為恆等函式),生成一些不同的值,所以用 `list` 進行表示可以區分處理前後的差異,更具一般化 。 *** ## 2. 示例:文件中單詞計數 論文中給出了 *MapReduce* 的經典使用示例,即 **統計文件中每個單詞出現次數** ( *word count* 任務 ),通過此示例可以直觀瞭解到 *MapReduce* 的使用方法 。 由使用者實現的 `map` 和 `reduce` 函式的虛擬碼為: ```c++ map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w,"1"); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result)); ``` 通過上述虛擬碼可以看到: - 對於 `map` 函式,輸入一個鍵值對, *key* 為檔名,*value* 為檔案內容,它對檔案中每一個單詞都生成 **中間鍵值對** `` ,最後返回的內容為一個鍵值對的集合,表示為 `list(<'cat', '1'>, <'dog', '1'>, ..., <'cat', '1'>, <'pig', '1'>)` - 對於 `reduce` 函式,輸入一個鍵值對,*key* 為一個單詞 ,*value* 為該單詞對應的計數的列表,即 `list('1', '1', '1', ..., '1')` ,列表中 `'1'` 的個數即為文件中該單詞出現的次數,最後將單詞出現的次數返回 - 從 `list(<'cat', '1'>, <'dog', '1'>, ..., <'cat', '1'>, <'pig', '1'>)` 轉化為 `<'cat', list('1', '1', '1', ..., '1')>` 的步驟是由 *MapReduce* 框架來執行的 上述過程可以圖示為: ![](https://img2020.cnblogs.com/blog/2035097/202007/2035097-20200703170452432-430049037.jpg) 論文附錄 *A* 有由 *C++* 實現的針對文件詞計數任務的 *map* 函式、*reduce* 函式 以及 呼叫兩介面的完整程式程式碼,在此不做詳述 。 *** ## 3. Google 的 MapReduce 實現 *MapReduce* 模型可以有多種不同的實現方式,論文主要介紹了一種在 *Google* 內部廣泛使用的計算環境下(通過乙太網交換機連線,並由商用伺服器所組成的大型叢集)使用的 *MapReduce* 實現 。 ### 3.1 執行流程 ![](https://img2020.cnblogs.com/blog/2035097/202007/2035097-20200703170502368-709259845.png) 上圖為此 *MapReduce* 框架實現的示意圖,下文基於此圖對 *MapReduce* 的執行過程進行描述,描述的序號與圖中的序號相對應(這部分內容論文描述比較詳細,所以以翻譯為主,穿插個人理解以及補充後文中的優化細節): 1. *MapReduce* 庫會先把檔案切分成 `M` 個片段( 每個大小為 *16MB~64MB* ),儲存在 **GFS 檔案系統** ,接著,它會在叢集中啟動多個 **程式副本** 。 2. 這些程式副本中,一個為 `master` ,剩餘為 `worker` ,*master* 對 *worker* 進行任務分配,共有 `M` 個 *map* 任務以及 `R` 個 *reduce* 任務( *M* 同時為檔案片段數 , *R* 由使用者指定),*master* 會給每個空閒的 *worker* 分配一個 *map* 任務或者一個 *reduce* 任務 。 3. 被分配了 *map* 任務的 *worker* 會讀取相關的輸入資料片段,這些資料片段一般位於該 *worker* 所在的伺服器上( *master* 排程時會優先使 *map* 任務執行在儲存有相關輸入資料的伺服器上,通過這種 **本地執行** 的方式降低伺服器間網路通訊,節約網路頻寬 )。它會解析出輸入資料中的 **鍵值對** ,並將它們傳入使用者定義的 *Map* 函式中,*Map* 函式所生成的 **中間鍵值對** 會被快取在記憶體中 。( 要將 *map* 任務和使用者定義的 *Map* 函式區分開來,*map* 任務包含了一些前置處理以及 *Map* 函式的執行 ,*reduce* 任務和 *Reduce* 函式同理 ) 4. 每隔一段時間,被快取的中間鍵值對會被寫入到本地硬碟,並通過分割槽函式(一般是雜湊後取模)分到 *R* 個區域內 。這些被快取的鍵值對在本地硬碟的位置會被傳回 *master* ,*master* 負責將這些位置轉發給執行 *reduce* 任務的 *worker* 。 5. 所有 *map* 任務執行結束後,*master* 才開始分發 *reduce* 任務 。當某個執行 *reduce* 任務的 *worker* 從 *master* 獲取到了這些位置資訊,該 *worker* 就會通過 **RPC** 的方式從儲存了對應快取中間資料的 *map workers* 的本地硬碟中讀取資料 ( 輸入一個 *reduce* 任務中的中間資料會產生自所有 *map* 任務 )。當一個 *reduce worker* 讀完所有中間資料後,會 **根據中間鍵進行排序,使得具有相同中間鍵的資料可以聚合在一起** 。(需要排序是因為中間 *key* 的數量一般遠大於 *R* ,許多不同 *key* 會對映到同一個 *reduce* 任務中 )如果中間資料的資料量太大而無法放到記憶體中,需要使用外部排序 。 6. *reduce worker* 會對排序後的中間資料進行遍歷,對於每個唯一的中間鍵,將該中間鍵和對應的中間值的集合傳入使用者提供的 *Reduce* 函式中,*Reduce* 函式生成的輸出會被追加到這個 *reduce* 任務分割槽的輸出檔案中 ( 即一個 *reduce* 任務對應一個輸出檔案,即 *R* 個輸出檔案,儲存在 *GFS* 檔案系統,需要的話可作為另一個 *MapReduce* 呼叫的輸入 )。 7. 當所有的 *map* 和 *reduce* 任務完成後,*master* 會喚醒使用者程式 。此時,使用者程式會結束對 *MapReduce* 的呼叫 。 ### 3.2 容錯 #### 3.2.1 Woker 故障 *master* 會週期性地 *ping* 每個 *worker* ,若在一定時間內無法收到某個 *worker* 的響應,那麼 *master* 將該 *worker* 標記為 *fail* : - 此 *worker* 上 **完成** 的所有 *map* 任務都被重設為 *idle* 狀態,交由別的 *worker* 去執行這些 *map* 任務 - 此 *worker* 上 **正在執行** 的 *map* 任務或 *reduce* 任務重設為 *idle* 狀態,並等待重新排程 該 *worker* 上完成的 *map* 任務必須重新執行,因為 *map* 任務資料結果儲存在 *worker* 的本地硬碟中,*worker* 無法訪問了,則輸出資料也無法訪問;該 *worker* 上完成的 *reduce* 任務不需要重新執行,因為輸出結果已儲存在全域性檔案系統中 。 #### 3.2.2 Master 故障 目前的實現選擇中斷 *MapReduce* 計算,客戶端可檢查該 *master* 的狀態,並根據需要重新執行 *MapReduce* 操作 。 ### 3.3 資料儲存位置 此模式是為了 **節約網路頻寬** 。 將輸入資料( 由 *GFS* 系統管理 )儲存在叢集中伺服器的本地硬碟上,*GFS* 將每個檔案分割為大小為 *64MB* 的 *Block* ,並且對每個 *Block* 儲存多個副本(通常3個副本,分散在不同機器上)。*master* 排程 *map* 任務時會考慮輸入資料檔案的位置資訊,儘量在包含該相關輸入資料的拷貝的機器上執行 *map* 任務 。若任務失敗,*master* 嘗試在儲存輸入資料副本的鄰近機器上執行 *map* 任務,以此來節約網路頻寬 。 ### 3.4 備用任務 此模式是為了緩解 **straggler (掉隊者) 問題** ,即 :一臺機器花費了異常多的時間去完成 **最後幾個** *map* 或 *reduce* 任務,導致整個計算時間延長的問題 。可能是由於硬碟問題,可能是 *CPU* 、記憶體、硬碟和網路頻寬的競爭而導致的 。 解決此問題的方法是:當一個 *MapReduce* 計算 **接近完成** 時,*master* **為正在執行中的任務執行 備用任務** ,當此任務完成時,無論是主任務還是備用任務完成的,都將此任務標記為完成 。這種方法雖然多使用了一些計算資源,但是有效降低了 *MapReduce Job* 的執行時間 。 ### 3.5 Combiner 函式 某些情況下,每個 *map* 任務生成的中間 *key* 會有明顯重複,可使用 **Combiner 函式** 在 *map worker* 上將資料進行部分合並,再傳往 *reduce worker* 。 *Combiner* 函式 和 *Reduce* 函式的實現程式碼一樣,區別在於兩個函式輸出不同,*Combiner* 函式的輸出被寫入中間檔案,*Reduce* 函式的輸出被寫入最終輸出檔案 。 這種方法可以提升某些型別的 *MapReduce* 任務的執行速度( 如 *word count* 任務)。 ### 3.6 臨時中間檔案 對於有伺服器故障而可能導致的 **reduce 任務可能讀到部分寫入的中間檔案** 的問題 。可以使用 **臨時中間檔案** ,即 *map* 任務將運算結果寫入臨時中間檔案,一旦該檔案完全生成完畢,以原子的方式對該檔案重新命名 。 *** ## 4. MapReduce 的優點 - 適合PB級以上海量資料的離線處理 - 隱藏了並行化、容錯、資料分發以及負載均衡等細節 - 允許沒有分散式或並行系統經驗的程式設計師輕鬆開發分散式任務程式 - 伸縮性好,使用更多的伺服器可以獲得更多的吞吐量 *** ## 5. MapReduce 的限制 - 不擅長實時計算 - 無法進行流式計算,因為 *MapReduce* 的輸入資料是靜態的 - 無多階段管道,對於先後依賴的任務,*MapReduce* 必須把資料寫入硬碟,再由下一個 *MapReduce* 任務呼叫這些資料,造成了多餘的磁碟 I/O *** ## 6. 相關問題總結 ### 6.1 MapReduce 如何節約網路頻寬 1. 叢集中所有伺服器既執行 *GFS* ,也執行 *MapReduce* 的 *worker* 2. *master* 排程時會優先使 *map* 任務執行在儲存有相關輸入資料的伺服器上 3. *reduce worker* 直接通過 *RPC* 從 *map worker* 獲取中間資料,而不是通過 *GFS* ,因此中間資料只需要進行一次網路傳輸 4. *R* 遠小於中間 *key* 的數量,因此中間鍵值對會被劃分到一個擁有很多 *key* 的檔案中,傳輸更大的檔案( 相對於一個檔案擁有更少的 *key* )效率更高 ### 6.2 MapReduce 如何獲得好的負載均衡 1. 通過備用任務緩解 *straggler* 問題 2. 使 *task* 數遠多於 *worker* 數,*master* 將空閒任務分給已經完成任務的 *worker* *** ## 7. 現狀 - *MapReduce* 已被 *Flume / FlumeJava* 所取代 - *GFS* 已被 *Colossus* 和 *BigTable* 所取