MapReduce技術的初步瞭解與學習
今天咱們學習下MapReduce模型。由於是本人是初次接觸,不是很瞭解。所以,有任何問題,還望各位不吝批評指正。本文中,我會先用最最通俗的語言闡述什麼是MapReduce,然後再摘自Google MapReduce中文版上的一些內容,以期對這個模型有個初步的瞭解與認識。ok,閒不多說,下面進入正題。
前言
海量資料處理也許是許多程式設計師需要面對的難題。儘管我們的計算機硬體越來越強大,但是相比於網際網路中的海量資料來說,我們的個人計算機處理能力實在是微乎其微。本部落格內前期已經對海量資料處理問題從演算法層面上做出了一些總結,本文將著重介紹一種常用的處理海量資料的程式設計模型,從架構的角度解析這一問題。
從概念說起
相信讀計算機的沒有人不知道“分散式計算”與“雲端計算”這兩個名詞。什麼是分散式?簡單的說就是把一件龐大的任務拋給n 多個計算機去處理。“雲端計算”依我的解釋就是分散式計算的一種,由於我沒有仔細研究過雲端計算,就不在本文中談論“雲”這個話題了。 MapReduce 就是一種簡單的分散式計算模型。既然是分散式計算,大家猜都能猜到 MapReduce 的功能:運用 n 多臺計算機處理同一堆海量資料以此得到最終結果。
按照MapReduce 的作者所說,這個模型的靈感來自於 Lisp 與其他函式語言的 map 和 reduce 表示。不懂什麼叫“函式語言”沒關係,這個概念不用管它。其實從名字上來看,我們就可以知道
另一種說法是:Map 函式把大資料集進行分解操作得到兩個或者更多的小“桶”。每臺處理器對分割出來的每個桶進行操作,獲取一組中間值,而 Reduce 函式是把這些中間結果通過一定的函式進行處理來獲取最終的答案。總結一下,我認為 Map 是一個“分”的過程,它把海量資料分割成了若干小塊以分給若干臺處理器去處理,而
舉例說明
相信幾乎沒有人喜歡看一堆只寫概念的文章。本文將通過例項來充分解釋清楚MapReduce 這個模型。
先從最最簡單的例子說起吧。假設我們有一組資料:1,2,3, …,100。求這一組資料的平方和。現在我們用 MapReduce 這個模型解決這一個問題。
首先我們把這組資料分成100 份,交由 100 臺處理器去處理。每一臺處理器只做一件事,就是把自己要處理的資料平方一下。這樣一來,最初的那組數 [1,2,3, …,100]就被對映成了 [1,4,9,16, …,10000]了。這就是所謂的 Map 操作。而 Reduce 操作呢? Reduce 操作就是把對映後得到的這 100 個新的資料累加咯,這不就得到結果了嗎?
這個例子太簡單了吧,不過它的確說明了MapReduce 的本質思想:先把要處理的資料分成 n 多個小塊,然後交由 n 多個處理器處理,最後再通過一定的手段進行資料彙總,得出答案。
換個例子吧。這次這個例子是幾乎所有講MapReduce 的文章都會講到的一個例子:單詞計數。假設有 3 篇文章,分別為:
Paper1: We study algorithm.
Paper2: We share our thinking.
Paper3: This team shares thinking of algorithm.
根據MapReduce 的模型,將這 3 篇文章交由 3 臺處理器單獨處理:
Map操作:
處理之後對中間結果進行一定處理:(注意:此步驟仍然是用多臺處理器分別完成)
最後進行彙總,即Reduce操作:
這樣,3 篇文章的所有詞頻不就處理完了嗎?還是很簡單吧。
Google MapReduce中文版
上文用最最簡單的語言闡述了什麼是MapReduce模型,以下內容全部摘自Google MapReduce中文版 。有任何問題,歡迎指正。
摘要
MapReduce是一個程式設計模型,也是一個處理和生成超大資料集的演算法模型的相關實現。使用者首先建立一個Map函式處理一個基於key/value pair的資料集合,輸出中間的基於key/value pair的資料集合;然後再建立一個Reduce函式用來合併所有的具有相同中間key值的中間value值。現實世界中有很多滿足上述處理模型的例子,本論文將詳細描述這個模型。
MapReduce架構的程式能夠在大量的普通配置的計算機上實現並行化處理。這個系統在執行時只關心:如何分割輸入資料,在大量計算機組成的叢集上的排程,叢集中計算機的錯誤處理,管理叢集中計算機之間必要的通訊。採用MapReduce架構可以使那些沒有平行計算和分散式處理系統開發經驗的程式設計師有效利用分散式系統的豐富資源。
我們的MapReduce實現執行在規模可以靈活調整的由普通機器組成的叢集上:一個典型的MapReduce計算往往由幾千臺機器組成、處理以TB計算的資料。程式設計師發現這個系統非常好用:已經實現了數以百計的MapReduce程式,在Google的叢集上,每天都有1000多個MapReduce程式在執行。
1、介紹
在過去的5年裡,包括本文作者在內的Google的很多程式設計師,為了處理海量的原始資料,已經實現了數以百計的、專用的計算方法。這些計算方法用來處理大量的原始資料,比如,文件抓取(類似網路爬蟲的程式)、Web請求日誌等等;也為了計算處理各種型別的衍生資料,比如倒排索引、Web文件的圖結構的各種表示形勢、每臺主機上網路爬蟲抓取的頁面數量的彙總、每天被請求的最多的查詢的集合等等。大多數這樣的資料處理運算在概念上很容易理解。然而由於輸入的資料量巨大,因此要想在可接受的時間內完成運算,只有將這些計算分佈在成百上千的主機上。如何處理平行計算、如何分發資料、如何處理錯誤?所有這些問題綜合在一起,需要大量的程式碼處理,因此也使得原本簡單的運算變得難以處理。
為了解決上述複雜的問題,我們設計一個新的抽象模型,使用這個抽象模型,我們只要表述我們想要執行的簡單運算即可,而不必關心平行計算、容錯、資料分佈、負載均衡等複雜的細節,這些問題都被封裝在了一個庫裡面。設計這個抽象模型的靈感來自Lisp和許多其他函式式語言的Map和Reduce的原語。我們意識到我們大多數的運算都包含這樣的操作:在輸入資料的“邏輯”記錄上應用Map操作得出一箇中間key/value pair集合,然後在所有具有相同key值的value值上應用Reduce操作,從而達到合併中間的資料,得到一個想要的結果的目的。使用MapReduce模型,再結合使用者實現的Map和Reduce函式,我們就可以非常容易的實現大規模並行化計算;通過MapReduce模型自帶的“再次執行”(re-execution)功能,也提供了初級的容災實現方案。
這個工作(實現一個MapReduce框架模型)的主要貢獻是通過簡單的介面來實現自動的並行化和大規模的分散式計算,通過使用MapReduce模型介面實現在大量普通的PC機上高效能運算。
第二部分描述基本的程式設計模型和一些使用案例。第三部分描述了一個經過裁剪的、適合我們的基於叢集的計算環境的MapReduce實現。第四部分描述我們認為在MapReduce程式設計模型中一些實用的技巧。第五部分對於各種不同的任務,測量我們MapReduce實現的效能。第六部分揭示了在Google內部如何使用MapReduce作為基礎重寫我們的索引系統產品,包括其它一些使用MapReduce的經驗。第七部分討論相關的和未來的工作。
2、程式設計模型
MapReduce程式設計模型的原理是:利用一個輸入key/value pair集合來產生一個輸出的key/value pair集合。MapReduce庫的使用者用兩個函式表達這個計算:Map和Reduce。
使用者自定義的Map函式接受一個輸入的key/value pair值,然後產生一箇中間key/value pair值的集合。MapReduce庫把所有具有相同中間key值I的中間value值集合在一起後傳遞給reduce函式。
使用者自定義的Reduce函式接受一箇中間key的值I和相關的一個value值的集合。Reduce函式合併這些value值,形成一個較小的value值的集合。一般的,每次Reduce函式呼叫只產生0或1個輸出value值。通常我們通過一個迭代器把中間value值提供給Reduce函式,這樣我們就可以處理無法全部放入記憶體中的大量的value值的集合。
2.1、例子
例如,計算一個大的文件集合中每個單詞出現的次數,下面是虛擬碼段:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, “1″);
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函式輸出文件中的每個詞、以及這個詞的出現次數(在這個簡單的例子裡就是1)。Reduce函式把Map函式產生的每一個特定的詞的計數累加起來。
另外,使用者編寫程式碼,使用輸入和輸出檔案的名字、可選的調節引數來完成一個符合MapReduce模型規範的物件,然後呼叫MapReduce函式,並把這個規範物件傳遞給它。使用者的程式碼和MapReduce庫連結在一起(用C++實現)。附錄A包含了這個例項的全部程式程式碼。
2.2、型別
儘管在前面例子的虛擬碼中使用了以字串表示的輸入輸出值,但是在概念上,使用者定義的Map和Reduce函式都有相關聯的型別:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
比如,輸入的key和value值與輸出的key和value值在型別上推導的域不同。此外,中間key和value值與輸出key和value值在型別上推導的域相同。
(譯者注:原文中這個domain的含義不是很清楚,我參考Hadoop、KFS等實現,map和reduce都使用了泛型,因此,我把domain翻譯成型別推導的域)。
我們的C++中使用字串型別作為使用者自定義函式的輸入輸出,使用者在自己的程式碼中對字串進行適當的型別轉換。
2.3、更多的例子
這裡還有一些有趣的簡單例子,可以很容易的使用MapReduce模型來表示:
-
分散式的Grep:Map函式輸出匹配某個模式的一行,Reduce函式是一個恆等函式,即把中間資料複製到輸出。
-
計算URL訪問頻率:Map函式處理日誌中web頁面請求的記錄,然後輸出(URL,1)。Reduce函式把相同URL的value值都累加起來,產生(URL,記錄總數)結果。
-
倒轉網路連結圖:Map函式在源頁面(source)中搜索所有的連結目標(target)並輸出為(target,source)。Reduce函式把給定連結目標(target)的連結組合成一個列表,輸出(target,list(source))。
-
每個主機的檢索詞向量:檢索詞向量用一個(詞,頻率)列表來概述出現在文件或文件集中的最重要的一些詞。Map函式為每一個輸入文件輸出(主機名,檢索詞向量),其中主機名來自文件的URL。Reduce函式接收給定主機的所有文件的檢索詞向量,並把這些檢索詞向量加在一起,丟棄掉低頻的檢索詞,輸出一個最終的(主機名,檢索詞向量)。
-
倒排索引:Map函式分析每個文件輸出一個(詞,文件號)的列表,Reduce函式的輸入是一個給定詞的所有(詞,文件號),排序所有的文件號,輸出(詞,list(文件號))。所有的輸出集合形成一個簡單的倒排索引,它以一種簡單的演算法跟蹤詞在文件中的位置。
-
分散式排序:Map函式從每個記錄提取key,輸出(key,record)。Reduce函式不改變任何的值。這個運算依賴分割槽機制(在4.1描述)和排序屬性(在4.2描述)。
3、實現
MapReduce模型可以有多種不同的實現方式。如何正確選擇取決於具體的環境。例如,一種實現方式適用於小型的共享記憶體方式的機器,另外一種實現方式則適用於大型NUMA架構的多處理器的主機,而有的實現方式更適合大型的網路連線叢集。
本章節描述一個適用於Google內部廣泛使用的運算環境的實現:用乙太網交換機連線、由普通PC機組成的大型叢集。在我們的環境裡包括:
1.x86架構、執行Linux作業系統、雙處理器、2-4GB記憶體的機器。
2.普通的網路硬體裝置,每個機器的頻寬為百兆或者千兆,但是遠小於網路的平均頻寬的一半。
(譯者注:這裡需要網路專家解釋一下了)
3.叢集中包含成百上千的機器,因此,機器故障是常態。
4.儲存為廉價的內建IDE硬碟。一個內部分散式檔案系統用來管理儲存在這些磁碟上的資料。檔案系統通過資料複製來在不可靠的硬體上保證資料的可靠性和有效性。
5.使用者提交工作(job)給排程系統。每個工作(job)都包含一系列的任務(task),排程系統將這些任務排程到叢集中多臺可用的機器上。
3.1、執行概括
通過將Map呼叫的輸入資料自動分割為M個數據片段的集合,Map呼叫被分佈到多臺機器上執行。輸入的資料片段能夠在不同的機器上並行處理。使用分割槽函式將Map呼叫產生的中間key值分成R個不同分割槽(例如,hash(key) mod R),Reduce呼叫也被分佈到多臺機器上執行。分割槽數量(R)和分割槽函式由使用者來指定。
圖1展示了我們的MapReduce實現中操作的全部流程。當用戶呼叫MapReduce函式時,將發生下面的一系列動作(下面的序號和圖1中的序號一一對應):
- 使用者程式首先呼叫的MapReduce庫將輸入檔案分成M個數據片度,每個資料片段的大小一般從 16MB到64MB(可以通過可選的引數來控制每個資料片段的大小)。然後使用者程式在機群中建立大量的程式副本。(alex:copies of the program還真難翻譯)
- 這些程式副本中的有一個特殊的程式–master。副本中其它的程式都是worker程式,由master分配任務。有M個Map任務和R個Reduce任務將被分配,master將一個Map任務或Reduce任務分配給一個空閒的worker。
- 被分配了map任務的worker程式讀取相關的輸入資料片段,從輸入的資料片段中解析出key/value pair,然後把key/value pair傳遞給使用者自定義的Map函式,由Map函式生成並輸出的中間key/value pair,並快取在記憶體中。
- 快取中的key/value pair通過分割槽函式分成R個區域,之後週期性的寫入到本地磁碟上。快取的key/value pair在本地磁碟上的儲存位置將被回傳給master,由master負責把這些儲存位置再傳送給Reduce worker。
- 當Reduce worker程式接收到master程式發來的資料儲存位置資訊後,使用RPC從Map worker所在主機的磁碟上讀取這些快取資料。當Reduce worker讀取了所有的中間資料後,通過對key進行排序後使得具有相同key值的資料聚合在一起。由於許多不同的key值會對映到相同的Reduce任務上,因此必須進行排序。如果中間資料太大無法在記憶體中完成排序,那麼就要在外部進行排序。
- Reduce worker程式遍歷排序後的中間資料,對於每一個唯一的中間key值,Reduce worker程式將這個key值和它相關的中間value值的集合傳遞給使用者自定義的Reduce函式。Reduce函式的輸出被追加到所屬分割槽的輸出檔案。
- 當所有的Map和Reduce任務都完成之後,master喚醒使用者程式。在這個時候,在使用者程式裡的對MapReduce呼叫才返回。在成功完成任務之後,MapReduce的輸出存放在R個輸出檔案中(對應每個Reduce任務產生一個輸出檔案,檔名由使用者指定)。一般情況下,使用者不需要將這R個輸出檔案合併成一個檔案–他們經常把這些檔案作為另外一個MapReduce的輸入,或者在另外一個可以處理多個分割檔案的分散式應用中使用。
3.2、Master資料結構
Master持有一些資料結構,它儲存每一個Map和Reduce任務的狀態(空閒、工作中或完成),以及Worker機器(非空閒任務的機器)的標識。
Master就像一個數據管道,中間檔案儲存區域的位置資訊通過這個管道從Map傳遞到Reduce。因此,對於每個已經完成的Map任務,master儲存了Map任務產生的R箇中間檔案儲存區域的大小和位置。當Map任務完成時,Master接收到位置和大小的更新資訊,這些資訊被逐步遞增的推送給那些正在工作的Reduce任務。
3.3、容錯
因為MapReduce庫的設計初衷是使用由成百上千的機器組成的叢集來處理超大規模的資料,所以,這個庫必須要能很好的處理機器故障。
worker故障
master週期性的ping每個worker。如果在一個約定的時間範圍內沒有收到worker返回的資訊,master將把這個worker標記為失效。所有由這個失效的worker完成的Map任務被重設為初始的空閒狀態,之後這些任務就可以被安排給其他的worker。同樣的,worker失效時正在執行的Map或Reduce任務也將被重新置為空閒狀態,等待重新排程。
當worker故障時,由於已經完成的Map任務的輸出儲存在這臺機器上,Map任務的輸出已不可訪問了,因此必須重新執行。而已經完成的Reduce任務的輸出儲存在全域性檔案系統上,因此不需要再次執行。
當一個Map任務首先被worker A執行,之後由於worker A失效了又被排程到worker B執行,這個“重新執行”的動作會被通知給所有執行Reduce任務的worker。任何還沒有從worker A讀取資料的Reduce任務將從worker B讀取資料。
MapReduce可以處理大規模worker失效的情況。比如,在一個MapReduce操作執行期間,在正在執行的叢集上進行網路維護引起80臺機器在幾分鐘內不可訪問了,MapReduce master只需要簡單的再次執行那些不可訪問的worker完成的工作,之後繼續執行未完成的任務,直到最終完成這個MapReduce操作。
master失敗
一個簡單的解決辦法是讓master週期性的將上面描述的資料結構(譯者注:指3.2節)
的寫入磁碟,即檢查點(checkpoint)。如果這個master任務失效了,可以從最後一個檢查點(checkpoint)開始啟動另一個master程序。然而,由於只有一個master程序,master失效後再恢復是比較麻煩的,因此我們現在的實現是如果master失效,就中止MapReduce運算。客戶可以檢查到這個狀態,並且可以根據需要重新執行MapReduce操作。
在失效方面的處理機制
(譯者注:原文為”semantics in the presence of failures”)
當用戶提供的Map和Reduce操作是輸入確定性函式(即相同的輸入產生相同的輸出)時,我們的分散式實現在任何情況下的輸出都和所有程式沒有出現任何錯誤、順序的執行產生的輸出是一樣的。
我們依賴對Map和Reduce任務的輸出是原子提交的來完成這個特性。每個工作中的任務把它的輸出寫到私有的臨時檔案中。每個Reduce任務生成一個這樣的檔案,而每個Map任務則生成R個這樣的檔案(一個Reduce任務對應一個檔案)。當一個Map任務完成的時,worker傳送一個包含R個臨時檔名的完成訊息給master。如果master從一個已經完成的Map任務再次接收到到一個完成訊息,master將忽略這個訊息;否則,master將這R個檔案的名字記錄在資料結構裡。
當Reduce任務完成時,Reduce worker程序以原子的方式把臨時檔案重新命名為最終的輸出檔案。如果同一個Reduce任務在多臺機器上執行,針對同一個最終的輸出檔案將有多個重新命名操作執行。我們依賴底層檔案系統提供的重新命名操作的原子性來保證最終的檔案系統狀態僅僅包含一個Reduce任務產生的資料。
使用MapReduce模型的程式設計師可以很容易的理解他們程式的行為,因為我們絕大多數的Map和Reduce操作是確定性的,而且存在這樣的一個事實:我們的失效處理機制等價於一個順序的執行的操作。當Map或/和Reduce操作是不確定性的時候,我們提供雖然較弱但是依然合理的處理機制。當使用非確定操作的時候,一個Reduce任務R1的輸出等價於一個非確定性程式順序執行產生時的輸出。但是,另一個Reduce任務R2的輸出也許符合一個不同的非確定順序程式執行產生的R2的輸出。
考慮Map任務M和Reduce任務R1、R2的情況。我們設定e(Ri)是Ri已經提交的執行過程(有且僅有一個這樣的執行過程)。當e(R1)讀取了由M一次執行產生的輸出,而e(R2)讀取了由M的另一次執行產生的輸出,導致了較弱的失效處理。
3.4、儲存位置
在我們的計算執行環境中,網路頻寬是一個相當匱乏的資源。我們通過儘量把輸入資料(由GFS管理)儲存在叢集中機器的本地磁碟上來節省網路頻寬。GFS把每個檔案按64MB一個Block分隔,每個Block儲存在多臺機器上,環境中就存放了多份拷貝(一般是3個拷貝)。MapReduce的master在排程Map任務時會考慮輸入檔案的位置資訊,儘量將一個Map任務排程在包含相關輸入資料拷貝的機器上執行;如果上述努力失敗了,master將嘗試在儲存有輸入資料拷貝的機器附近的機器上執行Map任務(例如,分配到一個和包含輸入資料的機器在一個switch裡的worker機器上執行)。當在一個足夠大的cluster叢集上執行大型MapReduce操作的時候,大部分的輸入資料都能從本地機器讀取,因此消耗非常少的網路頻寬。
3.5、任務粒度
如前所述,我們把Map拆分成了M個片段、把Reduce拆分成R個片段執行。理想情況下,M和R應當比叢集中worker的機器數量要多得多。在每臺worker機器都執行大量的不同任務能夠提高叢集的動態的負載均衡能力,並且能夠加快故障恢復的速度:失效機器上執行的大量Map任務都可以分佈到所有其他的worker機器上去執行。
但是實際上,在我們的具體實現中對M和R的取值都有一定的客觀限制,因為master必須執行O(M+R)次排程,並且在記憶體中儲存O(M*R)個狀態(對影響記憶體使用的因素還是比較小的:O(M*R)塊狀態,大概每對Map任務/Reduce任務1個位元組就可以了)。
更進一步,R值通常是由使用者指定的,因為每個Reduce任務最終都會生成一個獨立的輸出檔案。實際使用時我們也傾向於選擇合適的M值,以使得每一個獨立任務都是處理大約16M到64M的輸入資料(這樣,上面描寫的輸入資料本地儲存優化策略才最有效),另外,我們把R值設定為我們想使用的worker機器數量的小的倍數。我們通常會用這樣的比例來執行MapReduce:M=200000,R=5000,使用2000臺worker機器。
3.6、備用任務
影響一個MapReduce的總執行時間最通常的因素是“落伍者”:在運算過程中,如果有一臺機器花了很長的時間才完成最後幾個Map或Reduce任務,導致MapReduce操作總的執行時間超過預期。出現“落伍者”的原因非常多。比如:如果一個機器的硬碟出了問題,在讀取的時候要經常的進行讀取糾錯操作,導致讀取資料的速度從30M/s降低到1M/s。如果cluster的排程系統在這臺機器上又排程了其他的任務,由於CPU、記憶體、本地硬碟和網路頻寬等競爭因素的存在,導致執行MapReduce程式碼的執行效率更加緩慢。我們最近遇到的一個問題是由於機器的初始化程式碼有bug,導致關閉了的處理器的快取:在這些機器上執行任務的效能和正常情況相差上百倍。
我們有一個通用的機制來減少“落伍者”出現的情況。當一個MapReduce操作接近完成的時候,master排程備用(backup)任務程序來執行剩下的、處於處理中狀態(in-progress)的任務。無論是最初的執行程序、還是備用(backup)任務程序完成了任務,我們都把這個任務標記成為已經完成。我們調優了這個機制,通常只會佔用比正常操作多幾個百分點的計算資源。我們發現採用這樣的機制對於減少超大MapReduce操作的總處理時間效果顯著。例如,在5.3節描述的排序任務,在關閉掉備用任務的情況下要多花44%的時間完成排序任務。
4、技巧
雖然簡單的Map和Reduce函式提供的基本功能已經能夠滿足大部分的計算需要,我們還是發掘出了一些有價值的擴充套件功能。本節將描述這些擴充套件功能。
4.1、分割槽函式
MapReduce的使用者通常會指定Reduce任務和Reduce任務輸出檔案的數量(R)。我們在中間key上使用分割槽函式來對資料進行分割槽,之後再輸入到後續任務執行程序。一個預設的分割槽函式是使用hash方法(比如,hash(key) mod R)進行分割槽。hash方法能產生非常平衡的分割槽。然而,有的時候,其它的一些分割槽函式對key值進行的分割槽將非常有用。比如,輸出的key值是URLs,我們希望每個主機的所有條目保持在同一個輸出檔案中。為了支援類似的情況,MapReduce庫的使用者需要提供專門的分割槽函式。例如,使用“hash(Hostname(urlkey)) mod R”作為分割槽函式就可以把所有來自同一個主機的URLs儲存在同一個輸出檔案中。
4.2、順序保證
我們確保在給定的分割槽中,中間key/value pair資料的處理順序是按照key值增量順序處理的。這樣的順序保證對每個分成生成一個有序的輸出檔案,這對於需要對輸出檔案按key值隨機存取的應用非常有意義,對在排序輸出的資料集也很有幫助。
4.3、Combiner函式
在某些情況下,Map函式產生的中間key值的重複資料會佔很大的比重,並且,使用者自定義的Reduce函式滿足結合律和交換律。在2.1節的詞數統計程式是個很好的例子。由於詞頻率傾向於一個zipf分佈(齊夫分佈),每個Map任務將產生成千上萬個這樣的記錄<the,1>。所有的這些記錄將通過網路被髮送到一個單獨的Reduce任務,然後由這個Reduce任務把所有這些記錄累加起來產生一個數字。我們允許使用者指定一個可選的combiner函式,combiner函式首先在本地將這些記錄進行一次合併,然後將合併的結果再通過網路傳送出去。
Combiner函式在每臺執行Map任務的機器上都會被執行一次。一般情況下,Combiner和Reduce函式是一樣的。Combiner函式和Reduce函式之間唯一的區別是MapReduce庫怎樣控制函式的輸出。Reduce函式的輸出被儲存在最終的輸出檔案裡,而Combiner函式的輸出被寫到中間檔案裡,然後被髮送給Reduce任務。
部分的合併中間結果可以顯著的提高一些MapReduce操作的速度。附錄A包含一個使用combiner函式的例子。
4.4、輸入和輸出的型別
MapReduce庫支援幾種不同的格式的輸入資料。比如,文字模式的輸入資料的每一行被視為是一個key/value pair。key是檔案的偏移量,value是那一行的內容。另外一種常見的格式是以key進行排序來儲存的key/value pair的序列。每種輸入型別的實現都必須能夠把輸入資料分割成資料片段,該資料片段能夠由單獨的Map任務來進行後續處理(例如,文字模式的範圍分割必須確保僅僅在每行的邊界進行範圍分割)。雖然大多數MapReduce的使用者僅僅使用很少的預定義輸入型別就滿足要求了,但是使用者依然可以通過提供一個簡單的Reader介面實現就能夠支援一個新的輸入型別。
Reader並非一定要從檔案中讀取資料,比如,我們可以很容易的實現一個從資料庫裡讀記錄的Reader,或者從記憶體中的資料結構讀取資料的Reader。
類似的,我們提供了一些預定義的輸出資料的型別,通過這些預定義型別能夠產生不同格式的資料。使用者採用類似新增新的輸入資料型別的方式增加新的輸出型別。
4.5、副作用
在某些情況下,MapReduce的使用者發現,如果在Map和/或Reduce操作過程中增加輔助的輸出檔案會比較省事。我們依靠程式writer把這種“副作用”變成原子的和冪等的(譯者注:冪等的指一個總是產生相同結果的數學運算) 。通常應用程式首先把輸出結果寫到一個臨時檔案中,在輸出全部資料之後,在使用系統級的原子操作rename重新命名這個臨時檔案。
如果一個任務產生了多個輸出檔案,我們沒有提供類似兩階段提交的原子操作支援這種情況。因此,對於會產生多個輸出檔案、並且對於跨檔案有一致性要求的任務,都必須是確定性的任務。但是在實際應用過程中,這個限制還沒有給我們帶來過麻煩。
4.6、跳過損壞的記錄
有時候,使用者程式中的bug導致Map或者Reduce函式在處理某些記錄的時候crash掉,MapReduce操作無法順利完成。慣常的做法是修復bug後再次執行MapReduce操作,但是,有時候找出這些bug並修復它們不是一件容易的事情;這些bug也許是在第三方庫裡邊,而我們手頭沒有這些庫的原始碼。而且在很多時候,忽略一些有問題的記錄也是可以接受的,比如在一個巨大的資料集上進行統計分析的時候。我們提供了一種執行模式,在這種模式下,為了保證保證整個處理能繼續進行,MapReduce會檢測哪些記錄導致確定性的crash,並且跳過這些記錄不處理。
每個worker程序都設定了訊號處理函式捕獲記憶體段異常(segmentation violation)和匯流排錯誤(bus error)。在執行Map或者Reduce操作之前,MapReduce庫通過全域性變數儲存記錄序號。如果使用者程式觸發了一個系統訊號,訊息處理函式將用“最後一口氣”通過UDP包向master傳送處理的最後一條記錄的序號。當master看到在處理某條特定記錄不止失敗一次時,master就標誌著條記錄需要被跳過,並且在下次重新執行相關的Map或者Reduce任務的時候跳過這條記錄。
4.7、本地執行
除錯Map和Reduce函式的bug是非常困難的,因為實際執行操作時不但是分佈在系統中執行的,而且通常是在好幾千臺計算機上執行,具體的執行位置是由master進行動態排程的,這又大大增加了除錯的難度。為了簡化除錯、profile和小規模測試,我們開發了一套MapReduce庫的本地實現版本,通過使用本地版本的MapReduce庫,MapReduce操作在本地計算機上順序的執行。使用者可以控制MapReduce操作的執行,可以把操作限制到特定的Map任務上。使用者通過設定特別的標誌來在本地執行他們的程式,之後就可以很容易的使用本地除錯和測試工具(比如gdb)。
4.8、狀態資訊
master使用嵌入式的HTTP伺服器(如Jetty)顯示一組狀態資訊頁面,使用者可以監控各種執行狀態。狀態資訊頁面顯示了包括計算執行的進度,比如已經完成了多少任務、有多少任務正在處理、輸入的位元組數、中間資料的位元組數、輸出的位元組數、處理百分比等等。頁面還包含了指向每個任務的stderr和stdout檔案的連結。使用者根據這些資料預測計算需要執行大約多長時間、是否需要增加額外的計算資源。這些頁面也可以用來分析什麼時候計算執行的比預期的要慢。
另外,處於最頂層的狀態頁面顯示了哪些worker失效了,以及他們失效的時候正在執行的Map和Reduce任務。這些資訊對於除錯使用者程式碼中的bug很有幫助。
4.9、計數器
MapReduce庫使用計數器統計不同事件發生次數。比如,使用者可能想統計已經處理了多少個單詞、已經索引的多少篇German文件等等。
為了使用這個特性,使用者在程式中建立一個命名的計數器物件,在Map和Reduce函式中相應的增加計數器的值。例如:
Counter* uppercase;
uppercase = GetCounter(“uppercase”);
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, “1″);
這些計數器的值週期性的從各個單獨的worker機器上傳遞給master(附加在ping的應答包中傳遞)。master把執行成功的Map和Reduce任務的計數器值進行累計,當MapReduce操作完成之後,返回給使用者程式碼。
計數器當前的值也會顯示在master的狀態頁面上,這樣使用者就可以看到當前計算的進度。當累加計數器的值的時候,master要檢查重複執行的Map或者Reduce任務,避免重複累加(之前提到的備用任務和失效後重新執行任務這兩種情況會導致相同的任務被多次執行)。
有些計數器的值是由MapReduce庫自動維持的,比如已經處理的輸入的key/value pair的數量、輸出的key/value pair的數量等等。
計數器機制對於MapReduce操作的完整性檢查非常有用。比如,在某些MapReduce操作中,使用者需要確保輸出的key value pair精確的等於輸入的key value pair,或者處理的German文件數量在處理的整個文件數量中屬於合理範圍。
5、效能
本節我們用在一個大型叢集上執行的兩個計算來衡量MapReduce的效能。一個計算在大約1TB的資料中進行特定的模式匹配,另一個計算對大約1TB的資料進行排序。
這兩個程式在大量的使用MapReduce的實際應用中是非常典型的 — 一類是對資料格式進行轉換,從一種表現形式轉換為另外一種表現形式;另一類是從海量資料中抽取少部分的使用者感興趣的資料。
5.1、叢集配置
所有這些程式都執行在一個大約由1800臺機器構成的叢集上。每臺機器配置2個2G主頻、支援超執行緒的Intel Xeon處理器,4GB的實體記憶體,兩個160GB的IDE硬碟和一個千兆乙太網卡。這些機器部署在一個兩層的樹形交換網路中,在root節點大概有100-200GBPS的傳輸頻寬。所有這些機器都採用相同的部署(對等部署),因此任意兩點之間的網路來回時間小於1毫秒。
在4GB記憶體裡,大概有1-1.5G用於執行在叢集上的其他任務。測試程式在週末下午開始執行,這時主機的CPU、磁碟和網路基本上處於空閒狀態。
5.2、GREP
這個分散式的grep程式需要掃描大概10的10次方個由100個位元組組成的記錄,查找出現概率較小的3個字元的模式(這個模式在92337個記錄中出現)。輸入資料被拆分成大約64M的Block(M=15000),整個輸出資料存放在一個檔案中(R=1)。
圖2顯示了這個運算隨時間的處理過程。其中Y軸表示輸入資料的處理速度。處理速度隨著參與MapReduce計算的機器數量的增加而增加,當1764臺worker參與計算的時,處理速度達到了30GB/s。當Map任務結束的時候,即在計算開始後80秒,輸入的處理速度降到0。整個計算過程從開始到結束一共花了大概150秒。這包括了大約一分鐘的初始啟動階段。初始啟動階段消耗的時間包括了是把這個程式傳送到各個worker機器上的時間、等待GFS檔案系統開啟1000個輸入檔案集合的時間、獲取相關的檔案本地位置優化資訊的時間。
5.3、排序
排序程式處理10的10次方個100個位元組組成的記錄(大概1TB的資料)。這個程式模仿TeraSort benchmark[10]。
排序程式由不到50行程式碼組成。只有三行的Map函式從文字行中解析出10個位元組的key值作為排序的key,並且把這個key和原始文字行作為中間的key/value pair值輸出。我們使用了一個內建的恆等函式作為Reduce操作函式。這個函式把中間的key/value pair值不作任何改變輸出。最終排序結果輸出到兩路複製的GFS檔案系統(也就是說,程式輸出2TB的資料)。
如前所述,輸入資料被分成64MB的Block(M=15000)。我們把排序後的輸出結果分割槽後儲存到4000個檔案(R=4000)。分割槽函式使用key的原始位元組來把資料分割槽到R個片段中。
在這個benchmark測試中,我們使用的分割槽函式知道key的分割槽情況。通常對於排序程式來說,我們會增加一個預處理的MapReduce操作用於取樣key值的分佈情況,通過取樣的資料來計算對最終排序處理的分割槽點。
圖三(a)顯示了這個排序程式的正常執行過程。左上的圖顯示了輸入資料讀取的速度。資料讀取速度峰值會達到13GB/s,並且所有Map任務完成之後,即大約200秒之後迅速滑落到0。值得注意的是,排序程式輸入資料讀取速度小於分散式grep程式。這是因為排序程式的Map任務花了大約一半的處理時間和I/O頻寬把中間輸出結果寫到本地硬碟。相應的分散式grep程式的中間結果輸出幾乎可以忽略不計。
左邊中間的圖顯示了中間資料從Map任務傳送到Reduce任務的網路速度。這個過程從第一個Map任務完成之後就開始緩慢啟動了。圖示的第一個高峰是啟動了第一批大概1700個Reduce任務(整個MapReduce分佈到大概1700臺機器上,每臺機器1次最多執行1個Reduce任務)。排序程式執行大約300秒後,第一批啟動的Reduce任務有些完成了,我們開始執行剩下的Reduce任務。所有的處理在大約600秒後結束。
左下圖表示Reduce任務把排序後的資料寫到最終的輸出檔案的速度。在第一個排序階段結束和資料開始寫入磁碟之間有一個小的延時,這是因為worker機器正在忙於排序中間資料。磁碟寫入速度在2-4GB/s持續一段時間。輸出資料寫入磁碟大約持續850秒。計入初始啟動部分的時間,整個運算消耗了891秒。這個速度和TeraSort benchmark[18]的最高紀錄1057秒相差不多。
還有一些值得注意的現象:輸入資料的讀取速度比排序速度和輸出資料寫入磁碟速度要高不少,這是因為我們的輸入資料本地化優化策略起了作用 — 絕大部分資料都是從本地硬碟讀取的,從而節省了網路頻寬。排序速度比輸出資料寫入到磁碟的速度快,這是因為輸出資料寫了兩份(我們使用了2路的GFS檔案系統,寫入複製節點的原因是為了保證資料可靠性和可用性)。我們把輸出資料寫入到兩個複製節點的原因是因為這是底層檔案系統的保證資料可靠性和可用性的實現機制。如果底層檔案系統使用類似容錯編碼[14](erasure coding)的方式而不是複製的方式保證資料的可靠性和可用性,那麼在輸出資料寫入磁碟的時候,就可以降低網路頻寬的使用。
5.4、高效的backup任務
圖三(b)顯示了關閉了備用任務後排序程式執行情況。執行的過程和圖3(a)很相似,除了輸出資料寫磁碟的動作在時間上拖了一個很長的尾巴,而且在這段時間裡,幾乎沒有什麼寫入動作。在960秒後,只有5個Reduce任務沒有完成。這些拖後腿的任務又執行了300秒才完成。整個計算消耗了1283秒,多了44%的執行時間。
5.5、失效的機器
在圖三(c)中演示的排序程式執行的過程中,我們在程式開始後幾分鐘有意的kill了1746個worker中的200個。叢集底層的排程立刻在這些機器上重新開始新的worker處理程序(因為只是worker機器上的處理程序被kill了,機器本身還在工作)。
圖三(c)顯示出了一個“負”的輸入資料讀取速度,這是因為一些已經完成的Map任務丟失了(由於相應的執行Map任務的worker程序被kill了),需要重新執行這些任務。相關Map任務很快就被重新執行了。整個運算在933秒內完成,包括了初始啟動時間(只比正常執行多消耗了5%的時間)。
6、經驗
我們在2003年1月完成了第一個版本的MapReduce庫,在2003年8月的版本有了顯著的增強,這包括了輸入資料本地優化、worker機器之間的動態負載均衡等等。從那以後,我們驚喜的發現,MapReduce庫能廣泛應用於我們日常工作中遇到的各類問題。它現在在Google內部各個領域得到廣泛應用,包括:
-
大規模機器學習問題
-
Google News和Froogle產品的叢集問題
-
從公眾查詢產品(比如Google的Zeitgeist)的報告中抽取資料。
-
從大量的新應用和新產品的網頁中提取有用資訊(比如,從大量的位置搜尋網頁中抽取地理位置資訊)。
-
大規模的圖形計算。
圖四顯示了在我們的原始碼管理系統中,隨著時間推移,獨立的MapReduce程式數量的顯著增加。從2003年早些時候的0個增長到2004年9月份的差不多900個不同的程式。MapReduce的成功取決於採用MapReduce庫能夠在不到半個小時時間內寫出一個簡單的程式,這個簡單的程式能夠在上千臺機器的組成的叢集上做大規模併發處理,這極大的加快了開發和原形設計的週期。另外,採用MapReduce庫,可以讓完全沒有分散式和/或並行系統開發經驗的程式設計師很容易的利用大量的資源,開發出分散式和/或並行處理的應用。
在每個任務結束的時候,MapReduce庫統計計算資源的使用狀況。在表1,我們列出了2004年8月份MapReduce執行的任務所佔用的相關資源。
6.1、大規模索引
到目前為止,MapReduce最成功的應用就是重寫了Google網路搜尋服務所使用到的index系統。索引系統的輸入資料是網路爬蟲抓取回來的海量的文件,這些文件資料都儲存在GFS檔案系統裡。這些文件原始內容(譯者注:raw contents,我認為就是網頁中的剔除html標記後的內容、pdf和word等有格式文件中提取的文字內容等) 的大小超過了20TB。索引程式是通過一系列的MapReduce操作(大約5到10次)來建立索引。使用MapReduce(替換上一個特別設計的、分散式處理的索引程式)帶來這些好處:
-
實現索引部分的程式碼簡單、小巧、容易理解,因為對於容錯、分散式以及平行計算的處理都是MapReduce庫提供的。比如,使用MapReduce庫,計算的程式碼行數從原來的3800行C++程式碼減少到大概700行程式碼。
-
MapReduce庫的效能已經足夠好了,因此我們可以把在概念上不相關的計算步驟分開處理,而不是混在一起以期減少資料傳遞的額外消耗。概念上不相關的計算步驟的隔離也使得我們可以很容易改變索引處理方式。比如,對之前的索引系統的一個小更改可能要耗費好幾個月的時間,但是在使用MapReduce的新系統上,這樣的更改只需要花幾天時間就可以了。
-
索引系統的操作管理更容易了。因為由機器失效、機器處理速度緩慢、以及網路的瞬間阻塞等引起的絕大部分問題都已經由MapReduce庫解決了,不再需要操作人員的介入了。另外,我們可以通過在索引系統叢集中增加機器的簡單方法提高整體處理效能。
7、相關工作
很多系統都提供了嚴格的程式設計模式,並且通過對程式設計的嚴格限制來實現平行計算。例如,一個結合函式可以通過把N個元素的陣列的字首在N個處理器上使用並行字首演算法,在log N的時間內計算完[6,9,13](譯者注:完全沒有明白作者在說啥,具體參考相關6、9、13文件)。 MapReduce可以看作是我們結合在真實環境下處理海量資料的經驗,對這些經典模型進行簡化和萃取的成果。更加值得驕傲的是,我們還實現了基於上千臺處理器的叢集的容錯處理。相比而言,大部分併發處理系統都只在小規模的叢集上實現,並且把容錯處理交給了程式設計師。
Bulk Synchronous Programming[17]和一些MPI原語[11]提供了更高級別的並行處理抽象,可以更容易寫出並行處理的程式。MapReduce和這些系統的關鍵不同之處在於,MapReduce利用限制性程式設計模式實現了使用者程式的自動併發處理,並且提供了透明的容錯處理。
我們資料本地優化策略的靈感來源於active disks[12,15]等技術,在active disks中,計算任務是儘量推送到資料儲存的節點處理(譯者注:即靠近資料來源處理), 這樣就減少了網路和IO子系統的吞吐量。我們在掛載幾個硬碟的普通機器上執行我們的運算,而不是在磁碟處理器上執行我們的工作,但是達到的目的一樣的。
我們的備用任務機制和Charlotte System[3]提出的eager排程機制比較類似。Eager排程機制的一個缺點是如果一個任務反覆失效,那麼整個計算就不能完成。我們通過忽略引起故障的記錄的方式在某種程度上解決了這個問題。
MapReduce的實現依賴於一個內部的叢集管理系統,這個叢集管理系統負責在一個超大的、共享機器的叢集上分佈和執行使用者任務。雖然這個不是本論文的重點,但是有必要提一下,這個叢集管理系統在理念上和其它系統,如Condor[16]是一樣。
MapReduce庫的排序機制和NOW-Sort[1]的操作上很類似。讀取輸入源的機器(map workers)把待排序的資料進行分割槽後,傳送到R個Reduce worker中的一個進行處理。每個Reduce worker在本地對資料進行排序(儘可能在記憶體中排序)。當然,NOW-Sort沒有給使用者自定義的Map和Reduce函式的機會,因此不具備MapReduce庫廣泛的實用性。
River[2]提供了一個程式設計模型:處理程序通過分散式佇列傳送資料的方式進行互相通訊。和MapReduce類似,River系統嘗試在不對等的硬體環境下,或者在系統顛簸的情況下也能提供近似平均的效能。River是通過精心排程硬碟和網路的通訊來平衡任務的完成時間。MapReduce庫採用了其它的方法。通過對程式設計模型進行限制,MapReduce框架把問題分解成為大量的“小”任務。這些任務在可用的worker叢集上動態的排程,這樣快速的worker就可以執行更多的任務。通過對程式設計模型進行限制,我們可用在工作接近完成的時候排程備用任務,縮短在硬體配置不均衡的情況下縮小整個操作完成的時間(比如有的機器效能差、或者機器被某些操作阻塞了)。
BAD-FS[5]採用了和MapReduce完全不同的程式設計模式,它是面向廣域網(譯者注:wide-area network) 的。不過,這兩個系統有兩個基礎功能很類似。(1)兩個系統採用重新執行的方式來防止由於失效導致的資料丟失。(2)兩個都使用資料本地化排程策略,減少網路通訊的資料量。
TACC[7]是一個用於簡化構造高可用性網路服務的系統。和MapReduce一樣,它也依靠重新執行機制來實現的容錯處理。
8、結束語
MapReduce程式設計模型在Google內部成功應用於多個領域。我們把這種成功歸結為幾個方面:首先,由於MapReduce封裝了並行處理、容錯處理、資料本地化優化、負載均衡等等技術難點的細節,這使得MapReduce庫易於使用。即便對於完全沒有並行或者分散式系統開發經驗的程式設計師而言;其次,大量不同型別的問題都可以通過MapReduce簡單的解決。比如,MapReduce用於生成Google的網路搜尋服務所需要的資料、用來排序、用來資料探勘、用於機器學習,以及很多其它的系統;第三,我們實現了一個在數千臺計算機組成的大型叢集上靈活部署執行的MapReduce。這個實現使得有效利用這些豐富的計算資源變得非常簡單,因此也適合用來解決Google遇到的其他很多需要大量計算的問題。
我們也從MapReduce開發過程中學到了不少東西。首先,約束程式設計模式使得並行和分散式計算非常容易,也易於構造容錯的計算環境;其次,網路頻寬是稀有資源。大量的系統優化是針對減少網路傳輸量為目的的:本地優化策略使大量的資料從本地磁碟讀取,中間檔案寫入本地磁碟、並且只寫一份中間檔案也節約了網路頻寬;第三,多次執行相同的任務可以減少效能緩慢的機器帶來的負面影響(譯者注:即硬體配置的不平衡), 同時解決了由於機器失效導致的資料丟失問題。
附錄A、單詞頻率統計
本節包含了一個完整的程式,用於統計在一組命令列指定的輸入檔案中,每一個不同的單詞出現頻率。
#include “mapreduce/mapreduce.h”
// User’s map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),”1″);
}
}
};
REGISTER_MAPPER(WordCounter);
// User’s reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into “spec”
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format(“text”);
input->set_filepattern(argv[i]);
input->set_mapper_class(“WordCounter”);
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// …
MapReduceOutput* out = spec.output();
out->set_filebase(“/gfs/test/freq”);
out->set_num_tasks(100);
out->set_format(“text”);
out->set_reducer_class(“Adder”);
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class(“Adder”);
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: ‘result’ structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}
Google MapReduce中文版的原文地址 ,中文版原址 ,譯者Alex
感謝Frankie 對本文第一部分內容的貢獻。