谷歌三篇論文之二---MapReduce
MapReduce:超大機群上的簡單資料處理
MapReduce是一個程式設計模型,和處理、產生大資料集的相關實現。使用者指定一個map函式處理一個key/value對,從而產生中間的key/value對集。然後再指定一個reduce函式合併所有的具有相同中間key的中間value。下面將列舉許多可以用這個模型來表示的現實世界的工作。
以這種方式寫的程式能自動的在大規模的普通機器上實現並行化。這個執行時系統關心這些細節:分割輸入資料,在機群上的排程,機器的錯誤處理,管理機器之間必要的通訊。這樣就可以讓那些沒有並行分散式處理系統經驗的程式設計師利用大量分散式系統的資源。
我們的MapReduce實現執行在規模可以靈活調整的由普通機器組成的機群上,一個典型的MapReduce計算處理幾千臺機器上的以TB計算的資料。程式設計師發現這個系統非常好用:已經實現了數以百計的MapReduce程式,每天在Google的機群上都有1000多個MapReduce程式在執行。
1.介紹
在過去的5年裡,作者和Google的許多人已經實現了數以百計的為專門目的而寫的計算來處理大量的原始資料,比如,爬行的文件、Web請求日誌等等。為了計算各種型別的派生資料,比如,倒排索引,Web文件的圖結構的各種表示,每個主機上爬行的頁面數量的概要,每天被請求數量最多的集合,等等。很多這樣的計算在概念上很容易理解。然而,輸入的資料量很大,並且只有計算被分佈在成百上千的機器上才能在可以接受的時間內完成。怎樣平行計算,分發資料,處理錯誤,所有這些問題綜合在一起,使得原本很簡潔的計算,因為要大量的複雜程式碼來處理這些問題,而變得讓人難以處理。
作為對這個複雜性的迴應,我們設計一個新的抽象模型,它讓我們表示我們將要執行的簡單計算,而隱藏並行化、容錯、資料分佈、負載均衡的那些雜亂的細節,在一個庫裡。我們的抽象模型的靈感來自Lisp和許多其他函式語言的map和reduce的原始表示。我們認識到我們的許多計算都包含這樣的操作:在我們輸入資料的邏輯記錄上應用map操作,來計算出一箇中間key/value對集,在所有具有相同key的value上應用reduce操作,來適當的合併派生的資料。功能模型的使用,再結合使用者指定的map和reduce操作,讓我們可以非常容易的實現大規模並行化計算,和使用再次執行作為初級機制來實現容錯。
這個工作的主要貢獻是通過簡單有力的介面來實現自動的並行化和大規模分散式計算,結合這個介面的實現來在大量普通的PC機上實現高效能運算。
第二部分描述基本的程式設計模型,並且給一些例子。第三部分描述符合我們的基於叢集的計算環境的MapReduce的介面的實現。第四部分描述我們覺得程式設計模型中一些有用的技巧。第五部分對於各種不同的任務,測量我們實現的效能。第六部分探究在Google內部使用MapReduce作為基礎來重寫我們的索引系統產品。第七部分討論相關的和未來的工作。
2.程式設計模型
計算利用一個輸入key/value對集,來產生一個輸出key/value對集。MapReduce庫的使用者用兩個函式表達這個計算:map和reduce。
使用者自定義的map函式,接受一個輸入對,然後產生一箇中間key/value對集。MapReduce庫把所有具有相同中間key I的中間value聚合在一起,然後把它們傳遞給reduce函式。
使用者自定義的reduce函式,接受一箇中間key I和相關的一個value集。它合併這些value,形成一個比較小的value集。一般的,每次reduce呼叫只產生0或1個輸出value。通過一個迭代器把中間value提供給使用者自定義的reduce函式。這樣可以使我們根據記憶體來控制value列表的大小。
2.1 例項
考慮這個問題:計算在一個大的文件集合中每個詞出現的次數。使用者將寫和下面類似的虛擬碼:
map(String key,String value):
//key:文件的名字
//value:文件的內容
for each word w in value:
EmitIntermediate(w, “1”);
reduce(String key, Iterator values):
//key:一個詞
//values:一個計數列表
int result=0;
for each v in values:
result+=ParseInt(v);
Emit(AsString(resut));
map函式產生每個詞和這個詞的出現次數(在這個簡單的例子裡就是1)。reduce函式把產生的每一個特定的詞的計數加在一起。
另外,使用者用輸入輸出檔案的名字和可選的調節引數來填充一個mapreduce規範物件。使用者然後呼叫MapReduce函式,並把規範物件傳遞給它。使用者的程式碼和MapReduce庫連結在一起(用C++實現).附錄A包含這個例項的全部文字。
2.2型別
即使前面的虛擬碼寫成了字串輸入和輸出的term格式,但是概念上使用者寫的map和reduce函式有關聯的型別:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
例如:輸入的key、value和輸出的key、value的域不同。此外,中間key,value和輸出key、values的域相同。
我們的C++實現傳遞字串來和使用者自定義的函式互動,並把它留給使用者的程式碼,來在字串和適當的型別間進行轉換。
2.3 更多例項
這裡有一些讓人感興趣的簡單程式,可以容易的用MapReduce計算來表示。
分散式的Grep(UNIX工具程式, 可做檔案內的字串查詢):如果輸入行匹配給定的樣式,map函式就輸出這一行。reduce函式就是把中間資料複製到輸出。
計算URL訪問頻率:map函式處理web頁面請求的記錄,輸出(URL,1)。reduce函式把相同URL的value都加起來,產生一個(URL,記錄總數)的對。
倒轉網路連結圖:map函式為每個連結輸出(目標,源)對,一個URL叫做目標,包含這個URL的頁面叫做源。reduce函式根據給定的相關目標URLs連線所有的源URLs形成一個列表,產生(目標,源列表)對。
每個主機的術語向量:一個術語向量用一個(詞,頻率)列表來概述出現在一個文件或一個文件集中的最重要的一些詞。map函式為每一個輸入文件產生一個(主機名,術語向量)對(主機名來自文件的URL)。reduce函式接收給定主機的所有文件的術語向量。它把這些術語向量加在一起,丟棄低頻的術語,然後產生一個最終的(主機名,術語向量)對。
倒排索引:map函式分析每個文件,然後產生一個(詞,文件號)對的序列。reduce函式接受一個給定詞的所有對,排序相應的文件IDs,並且產生一個(詞,文件ID列表)對。所有的輸出對集形成一個簡單的倒排索引。它可以簡單的增加跟蹤詞位置的計算。
分散式排序:map函式從每個記錄提取key,並且產生一個(key,record)對。reduce函式不改變任何的對。這個計算依賴分割工具(在4.1描述)和排序屬性(在4.2描述)。
3 實現
MapReduce介面可能有許多不同的實現。根據環境進行正確的選擇。例如,一個實現對一個共享記憶體較小的機器是合適的,另外的適合一個大NUMA的多處理器的機器,而有的適合一個更大的網路機器的集合。
這部分描述一個在Google廣泛使用的計算環境的實現:用交換機連線的普通PC機的大叢集。我們的環境是:
1. Linux作業系統、雙處理器、2-4GB記憶體的機器。
2. 普通的網路硬體、每個機器的頻寬或者是百兆或者千兆、但是平均小於全部頻寬的一半。
3. 因為一個機群包含成百上千的機器,所有機器會經常出現問題。
4. 儲存用直接連到每個機器上的廉價IDE硬碟。一個從內部檔案系統發展起來的分散式檔案系統被用來管理儲存在這些磁碟上的資料。檔案系統用複製的方式在不可靠的硬體上來保證可靠性和有效性。
5. 使用者提交工作給排程系統。每個工作包含一個任務集,每個工作被排程者對映到機群中一個可用的機器集上。
3.1 執行預覽
通過自動分割輸入資料成一個有M個split的集,map呼叫被分佈到多臺機器上。輸入的split能夠在不同的機器上被並行處理。通過用分割函式分割中間key,來形成R個片(例如,hash(key) mod R),reduce呼叫被分佈到多臺機器上。分割數量(R)和分割函式由使用者來指定。
圖1顯示了我們實現的MapReduce操作的全部流程。當用戶的程式呼叫MapReduce的函式的時候,將發生下面的一系列動作(下面的數字和圖1中的數字標籤相對應):
1. 在使用者程式裡的MapReduce庫首先分割輸入檔案成M個片,每個片的大小一般從 16到64MB(使用者可以通過可選的引數來控制)。然後在機群中開始大量的拷貝程式。
2. 這些程式拷貝中的一個是master,其他的都是由master分配任務的worker。有M 個map任務和R個reduce任務將被分配。管理者分配一個map任務或reduce任務給一個空閒的worker。
3. 一個被分配了map任務的worker讀取相關輸入split的內容。它從輸入資料中分析出key/value對,然後把key/value對傳遞給使用者自定義的map函式。由map函式產生的中間key/value對被快取在記憶體中。
4. 快取在記憶體中的key/value對被週期性的寫入到本地磁碟上,通過分割函式把它們寫入R個區域。在本地磁碟上的快取對的位置被傳送給master,master負責把這些位置傳送給reduce worker。
5. 當一個reduce worker得到master的位置通知的時候,它使用遠端過程呼叫來從map worker的磁碟上讀取快取的資料。當reduce worker讀取了所有的中間資料後,它通過排序使具有相同key的內容聚合在一起。因為許多不同的key對映到相同的reduce任務,所以排序是必須的。如果中間資料比記憶體還大,那麼還需要一個外部排序。
6. reduce worker迭代排過序的中間資料,對於遇到的每一個唯一的中間key,它把key和相關的中間value集傳遞給使用者自定義的reduce函式。reduce函式的輸出被新增到這個reduce分割的最終的輸出檔案中。
7. 當所有的map和reduce任務都完成了,管理者喚醒使用者程式。在這個時候,在使用者程式裡的MapReduce呼叫返回到使用者程式碼。
在成功完成之後,mapreduce執行的輸出存放在R個輸出檔案中(每一個reduce任務產生一個由使用者指定名字的檔案)。一般,使用者不需要合併這R個輸出檔案成一個檔案 —– 他們經常把這些檔案當作一個輸入傳遞給其他的MapReduce呼叫,或者在可以處理多個分割檔案的分散式應用中使用他們。
3.2 master資料結構
master保持一些資料結構。它為每一個map和reduce任務儲存它們的狀態(空閒,工作中,完成)和worker機器(非空閒任務的機器)的標識。
master就像一個管道,通過它,中間檔案區域的位置從map任務傳遞到reduce任務。因此,對於每個完成的map任務,master儲存由map任務產生的R箇中間檔案區域的大小和位置。當map任務完成的時候,位置和大小的更新資訊被接受。這些資訊被逐步增加的傳遞給那些正在工作的reduce任務。
3.3 容錯
因為MapReduce庫被設計用來使用成百上千的機器來幫助處理非常大規模的資料,所以這個庫必須要能很好的處理機器故障。
worker故障
master週期性的ping每個worker。如果master在一個確定的時間段內沒有收到worker返回的資訊,那麼它將把這個worker標記成失效。因為每一個由這個失效的worker完成的map任務被重新設定成它初始的空閒狀態,所以它可以被安排給其他的worker。同樣的,每一個在失敗的worker上正在執行的map或reduce任務,也被重新設定成空閒狀態,並且將被重新排程。
在一個失敗機器上已經完成的map任務將被再次執行,因為它的輸出儲存在它的磁碟上,所以不可訪問。已經完成的reduce任務將不會再次執行,因為它的輸出儲存在全域性檔案系統中。
當一個map任務首先被worker A執行之後,又被B執行了(因為A失效了),重新執行這個情況被通知給所有執行reduce任務的worker。任何還沒有從A讀資料的reduce任務將從worker B讀取資料。
MapReduce可以處理大規模worker失敗的情況。例如,在一個MapReduce操作期間,在正在執行的機群上進行網路維護引起80臺機器在幾分鐘內不可訪問了,MapReduce master只是簡單的再次執行已經被不可訪問的worker完成的工作,繼續執行,最終完成這個MapReduce操作。
master失敗
可以很容易的讓管理者週期的寫入上面描述的資料結構的checkpoints。如果這個master任務失效了,可以從上次最後一個checkpoint開始啟動另一個master程序。然而,因為只有一個master,所以它的失敗是比較麻煩的,因此我們現在的實現是:如果master失敗,就中止MapReduce計算。客戶可以檢查這個狀態,並且可以根據需要重新執行MapReduce操作。
在錯誤面前的處理機制
當用戶提供的map和reduce操作對它的輸出值是確定的函式時,我們的分散式實現產生,和全部程式沒有錯誤的順序執行一樣,相同的輸出。
我們依賴對map和reduce任務的輸出進行原子提交來完成這個性質。每個工作中的任務把它的輸出寫到私有臨時檔案中。一個reduce任務產生一個這樣的檔案,而一個map任務產生R個這樣的檔案(一個reduce任務對應一個檔案。當一個map任務完成的時候,worker傳送一個訊息給master,在這個訊息中包含這R個臨時檔案的名字。如果master從一個已經完成的map任務再次收到一個完成的訊息,它將忽略這個訊息。否則,它在master的資料結構裡記錄這R個檔案的名字。
當一個reduce任務完成的時候,這個reduce worker原子的把臨時檔案重新命名成最終的輸出檔案。如果相同的reduce任務在多個機器上執行,多個重新命名呼叫將被執行,併產生相同的輸出檔案。我們依賴由底層檔案系統提供的原子重新命名操作來保證,最終的檔案系統狀態僅僅包含一個reduce任務產生的資料。
我們的map和reduce操作大部分都是確定的,並且我們的處理機制等價於一個順序的執行的這個事實,使得程式設計師可以很容易的理解程式的行為。當map或/和reduce操作是不確定的時候,我們提供雖然比較弱但是合理的處理機制。當在一個非確定操作的前面,一個reduce任務R1的輸出等價於一個非確定順序程式執行產生的輸出。然而,一個不同的reduce任務R2的輸出也許符合一個不同的非確定順序程式執行產生的輸出。
考慮map任務M和reduce任務R1、R2的情況。我們設定e(Ri)為已經提交的Ri的執行(有且僅有一個這樣的執行)。這個比較弱的語義出現,因為e(R1)也許已經讀取了由M的執行產生的輸出,而e(R2)也許已經讀取了由M的不同執行產生的輸出。
3.4 儲存位置
在我們的計算機環境裡,網路頻寬是一個相當缺乏的資源。我們利用把輸入資料(由GFS管理)儲存在機器的本地磁碟上來儲存網路頻寬。GFS把每個檔案分成64MB的一些塊,然後每個塊的幾個拷貝儲存在不同的機器上(一般是3個拷貝)。MapReduce的master考慮輸入檔案的位置資訊,並且努力在一個包含相關輸入資料的機器上安排一個map任務。如果這樣做失敗了,它嘗試在那個任務的輸入資料的附近安排一個map任務(例如,分配到一個和包含輸入資料塊在一個switch裡的worker機器上執行)。當執行巨大的MapReduce操作在一個機群中的一部分機器上的時候,大部分輸入資料在本地被讀取,從而不消耗網路頻寬。
3.5 任務粒度
象上面描述的那樣,我們細分map階段成M個片,reduce階段成R個片。M和R應當比worker機器的數量大許多。每個worker執行許多不同的工作來提高動態負載均衡,也可以加速從一個worker失效中的恢復,這個機器上的許多已經完成的map任務可以被分配到所有其他的worker機器上。
在我們的實現裡,M和R的範圍是有大小限制的,因為master必須做O(M+R)次排程,並且儲存O(M*R)個狀態在記憶體中(這個因素使用的記憶體是很少的,在O(M*R)個狀態片裡,大約每個map任務/reduce任務對使用一個位元組的資料)。
此外,R經常被使用者限制,因為每一個reduce任務最終都是一個獨立的輸出檔案。實際上,我們傾向於選擇M,以便每一個單獨的任務大概都是16到64MB的輸入資料(以便上面描述的位置優化是最有效的),我們把R設定成我們希望使用的worker機器數量的小倍數。我們經常執行MapReduce計算,在M=200000、R=5000、使用2000臺工作者機器的情況下。
3.6 備用任務
一個落後者是延長MapReduce操作時間的原因之一:一個機器花費一個異乎尋常地的長時間來完成最後的一些map或reduce任務中的一個。有很多原因可能產生落後者。例如,一個有壞磁碟的機器經常發生可以糾正的錯誤,這樣就使讀效能從30MB/s降低到3MB/s。機群排程系統也許已經安排其他的任務在這個機器上,由於計算要使用CPU、記憶體、本地磁碟、網路頻寬的原因,引起它執行MapReduce程式碼很慢。我們最近遇到的一個問題是:一個在機器初始化時的Bug引起處理器快取的失效,在一個被影響的機器上的計算效能有上百倍的影響。
我們有一個一般的機制來減輕這個落後者的問題。當一個MapReduce操作將要完成的時候,master排程備用程序來執行那些剩下的還在執行的任務。無論是原來的還是備用的執行完成了,工作都被標記成完成。我們已經調整了這個機制,通常只會佔用多幾個百分點的機器資源。我們發現這可以顯著的減少完成大規模MapReduce操作的時間。作為一個例子,將要在5.3描述的排序程式,在關閉掉備用任務的情況下,要比有備用任務的情況下多花44%的時間。
4 技巧
儘管簡單的map和reduce函式的功能對於大多數需求是足夠的了,但是我們開發了一些有用的擴充。這些將在這個部分描述。
4.1 分割函式
MapReduce使用者指定reduce任務和reduce任務需要的輸出檔案的數量。在中間key上使用分割函式,使資料分割後通過這些任務。一個預設的分割函式使用hash方法(例如,hash(key) mod R)。這個導致非常平衡的分割。然後,有的時候,使用其他的key分割函式來分割資料有非常有用的。例如,有時候,輸出的key是URLs,並且我們希望每個主機的所有條目保持在同一個輸出檔案中。為了支援像這樣的情況,MapReduce庫的使用者可以提供專門的分割函式。例如,使用”hash(Hostname(urlkey)) mod R”作為分割函式,使所有來自同一個主機的URLs儲存在同一個輸出檔案中。
4.2 順序保證
我們保證在一個給定的分割裡面,中間key/value對以key遞增的順序處理。這個順序保證可以使每個分割產出一個有序的輸出檔案,當輸出檔案的格式需要支援有效率的隨機訪問key的時候,或者對輸出資料集再作排序的時候,就很容易。
4.3 combiner函式
在某些情況下,允許中間結果key重複會佔據相當的比重,並且使用者定義的reduce函式滿足結合律和交換律。一個很好的例子就是在2.1部分的詞統計程式。因為詞頻率傾向於一個zipf分佈(齊夫分佈),每個map任務將產生成百上千個這樣的記錄。所有的這些計數將通過網路被傳輸到一個單獨的reduce任務,然後由reduce函式加在一起產生一個數字。我們允許使用者指定一個可選的combiner函式,先在本地進行合併一下,然後再通過網路傳送。
在每一個執行map任務的機器上combiner函式被執行。一般的,相同的程式碼被用在combiner和reduce函式。在combiner和reduce函式之間唯一的區別是MapReduce庫怎樣控制函式的輸出。reduce函式的輸出被儲存最終輸出檔案裡。combiner函式的輸出被寫到中間檔案裡,然後被髮送給reduce任務。
部分使用combiner可以顯著的提高一些MapReduce操作的速度。附錄A包含一個使用combiner函式的例子。
4.4 輸入輸出型別
MapReduce庫支援以幾種不同的格式讀取輸入資料。例如,文字模式輸入把每一行看作是一個key/value對。key是檔案的偏移量,value是那一行的內容。其他普通的支援格式以key的順序儲存key/value對序列。每一個輸入型別的實現知道怎樣把輸入分割成對每個單獨的map任務來說是有意義的(例如,文字模式的範圍分割確保僅僅在每行的邊界進行範圍分割)。雖然許多使用者僅僅使用很少的預定義輸入型別的一個,但是使用者可以通過提供一個簡單的reader介面來支援一個新的輸入型別。
一個reader不必要從檔案裡讀資料。例如,我們可以很容易的定義它從資料庫裡讀記錄,或從記憶體中的資料結構讀取。
4.5 副作用
有的時候,MapReduce的使用者發現在map操作或/和reduce操作時產生輔助檔案作為一個附加的輸出是很方便的。我們依靠應用程式寫來使這個副作用成為原子的。一般的,應用程式寫一個臨時檔案,然後一旦這個檔案全部產生完,就自動的被重新命名。
對於單個任務產生的多個輸出檔案來說,我們沒有提供其上的兩階段提交的原子操作支援。因此,一個產生需要交叉檔案連線的多個輸出檔案的任務,應該使確定性的任務。不過這個限制在實際的工作中並不是一個問題。
4.6 跳過錯誤記錄
有的時候因為使用者的程式碼裡有bug,導致在某一個記錄上map或reduce函式突然crash掉。這樣的bug使得MapReduce操作不能完成。雖然一般是修復這個bug,但是有時候這是不現實的;也許這個bug是在原始碼不可得到的第三方庫裡。有的時候也可以忽略一些記錄,例如,當在一個大的資料集上進行統計分析。我們提供一個可選的執行模式,在這個模式下,MapReduce庫檢測那些記錄引起的crash,然後跳過那些記錄,來繼續執行程式。
每個worker程式安裝一個訊號處理器來獲取記憶體段異常和匯流排錯誤。在呼叫一個使用者自定義的map或reduce操作之前,MapReduce庫把記錄的序列號儲存在一個全域性變數裡。如果使用者程式碼產生一個訊號,那個訊號處理器就會發送一個包含序號的”last gasp”UDP包給MapReduce的master。當master不止一次看到同一個記錄的時候,它就會指出,當相關的map或reduce任務再次執行的時候,這個記錄應當被跳過。
4.7 本地執行
除錯在map或reduce函式中問題是很困難的,因為實際的計算髮生在一個分散式的系統中,經常是有一個master動態的分配工作給幾千臺機器。為了簡化除錯和測試,我們開發了一個可替換的實現,這個實現在本地執行所有的MapReduce操作。使用者可以控制執行,這樣計算可以限制到特定的map任務上。使用者以一個標誌呼叫他們的程式,然後可以容易的使用他們認為好用的任何除錯和測試工具(例如,gdb)。
4.8 狀態資訊
master執行一個HTTP伺服器,並且可以輸出一組狀況頁來供人們使用。狀態頁顯示計算進度,像多少個任務已經完成,多少個還在執行,輸入的位元組數,中間資料位元組數,輸出位元組數,處理百分比,等等。這個頁也包含到標準錯誤的連結和由每個任務產生的標準輸出的連結。使用者可以根據這些資料預測計算需要花費的時間和是否需要更多的資源。當計算比預期的要慢很多的時候,這些頁面也可以被用來判斷是不是這樣。
此外,最上面的狀態頁顯示已經有多少個工作者失敗了,和當它們失敗的時候,那個map和reduce任務正在執行。當試圖診斷在使用者程式碼裡的bug時,這個資訊也是有用的。
4.9 計數器
MapReduce庫提供一個計數器工具,來計算各種事件的發生次數。例如,使用者程式碼想要計算所有處理的詞的個數或者被索引的德文文件的數量。
為了使用這個工具,使用者程式碼建立一個命名的計數器物件,然後在map或/和reduce函式裡適當的增加計數器。例如:
Counter * uppercase;
uppercase=GetCounter("uppercase");
map(String name,String contents):
for each word w in contents:
if(IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w,"1");
來自不同worker機器上的計數器值被週期性的傳送給master(在ping迴應裡)。master把來自成功的map和reduce任務的計數器值加起來,在MapReduce操作完成的時候,把它返回給使用者程式碼。當前計數器的值也被顯示在master狀態頁裡,以便人們可以檢視實際的計算進度。當計算計數器值的時候消除重複執行的影響,避免資料的累加(在備用任務的使用,和由於出錯的重新執行,可以產生重複執行)。
有些計數器值被MapReduce庫自動的維護,比如,被處理的輸入key/value對的數量和被產生的輸出key/value對的數量。
使用者發現計數器工具對於檢查MapReduce操作的完整性很有用。例如,在一些MapReduce操作中,使用者程式碼也許想要確保輸出對的數量完全等於輸入對的數量,或者處理過的德文文件的數量是在全部被處理的文件數量中屬於合理的範圍。
5 效能
在本節,我們用在一個大型叢集上執行的兩個計算來衡量MapReduce的效能。一個計算用來在一個大概1TB的資料中查詢特定的匹配串。另一個計算排序大概1TB的資料。
這兩個程式代表了MapReduce的使用者實現的真實的程式的一個大子集。一類是,把資料從一種表示轉化到另一種表示。另一類是,從一個大的資料集中提取少量的關心的資料。
5.1 機群配置
所有的程式在包含大概1800臺機器的機群上執行。機器的配置是:2個2G的Intel Xeon超執行緒處理器,4GB記憶體,兩個160GB IDE磁碟,一個千兆網絡卡。這些機器部署在一個由兩層的樹形交換網路中,在根節點上大概有100到2000G的頻寬。所有這些機器都有相同的部署(對等部署),因此任意兩點之間的來回時間小於1毫秒。
在4GB的記憶體裡,大概有1-1.5GB被用來執行在機群中其他的任務。這個程式是在週末的下午開始執行的,這個時候CPU、磁碟、網路基本上是空閒的。
5.2 Grep
這個Grep程式掃描大概10^10個、每個100位元組的記錄,查詢比較少的3字元的查詢串(這個查詢串出現在92337個記錄中)。輸入資料被分割成大概64MB的片(M=15000),全部的輸出存放在一個檔案中(R=1)。
圖2顯示計算過程隨時間變化的情況。Y軸表示輸入資料被掃描的速度。隨著更多的機群被分配給這個MapReduce計算,速度在逐步的提高,當有1764個worker的時候這個速度達到最高的30GB/s。當map任務完成的時候,速度開始下降,在計算開始後80秒,輸入的速度降到0。這個計算持續的時間大概是150秒。這包括了前面大概一分鐘的啟動時間。啟動時間用來把程式傳播到所有的機器上,等待GFS開啟1000個輸入檔案,得到必要的位置優化資訊。
5.3 排序
這個sort程式排序10^10個記錄,每個記錄100個位元組(大概1TB的資料)。這個程式是模仿TeraSort的。
這個排序程式只包含不到50行的使用者程式碼。其中有3行map函式用來從文字行提取10位元組的排序key,並且產生一個由這個key和原始文字行組成的中間key/value對。我們使用一個內建的Identity函式作為reduce操作。這個函式直接把中間key/value對作為輸出的key/value對。最終的排序輸出寫到一個2路複製的GFS檔案中(也就是程式的輸出會寫2TB的資料)。
像以前一樣,輸入資料被分割成64MB的片(M=15000)。我們把排序後的輸出寫到4000個檔案中(R=4000)。分割槽函式使用key的原始位元組來把資料分割槽到R個小片中。
我們以這個基準的分割函式,知道key的分佈情況。在一般的排序程式中,我們會增加一個預處理的MapReduce操作,這個操作用於取樣key的情況,並且用這個取樣的key的分佈情況來計算對最終排序處理的分割點。
圖3(a)顯示這個排序程式的正常執行情況。左上圖顯示輸入資料的讀取速度。這個速度最高到達13GB/s,並且在不到200秒所有map任務完成之後迅速滑落到0。注意到這個輸入速度小於Grep。這是因為這個排序map任務花費大概一半的時間和頻寬,來把中間資料寫到本地硬碟中。而Grep相關的中間資料可以忽略不計。
左中圖顯示資料通過網路從map任務傳輸給reduce任務的速度。當第一個map任務完成後,這個排序過程就開始了。圖示上的第一個高峰是啟動了第一批大概1700個reduce任務(整個MapReduce任務被分配到1700臺機器上,每個機器一次只執行一個reduce任務)。大概開始計算後的300秒,第一批reduce任務中的一些完成了,我們開始執行剩下的reduce任務。全部的排序過程持續了大概600秒的時間。
左下圖顯示排序後的資料被reduce任務寫入最終檔案的速度。因為機器忙於排序中間資料,所以在第一個排序階段的結束和寫階段的開始有一個延遲。寫的速度大概是2-4GB/s。大概開始計算後的850秒寫過程結束。包括前面的啟動過程,全部的計算任務持續的891秒。這個和TeraSort benchmark的最高紀錄1057秒差不多。
需要注意的事情是:因此位置優化的原因,很多資料都是從本地磁碟讀取的而沒有通過我們有限頻寬的網路,所以輸入速度比排序速度和輸出速度都要快。排序速度比輸出速度快的原因是輸出階段寫兩個排序後資料的拷貝(我們寫兩個副本的原因是為了可靠性和可用性)。我們寫兩份的原因是因為底層檔案系統的可靠性和可用性的要求。如果底層檔案系統用類似容錯編碼(erasure coding)的方式,而不採用複製寫的方式,在寫盤階段可以降低網路頻寬的要求。
5.4 備用任務的影響
在圖3(b)中,顯示我們不用備用任務的排序程式的執行情況。除了它有一個很長的幾乎沒有寫動作發生的尾巴外,執行流程和圖3(a)相似。在960秒後,只有5個reduce任務沒有完成。然而,就是這最後幾個落後者直到300秒後才完成。全部的計算任務執行了1283秒,多花了44%的時間。
5.5 機器失效
在圖3(c)中。顯示我們有意的在排序程式計算過程中停止1746臺worker中的200臺機器上的程式的情況。底層機群排程者在這些機器上馬上重新開始新的worker程式(因為僅僅程式被停止,而機器仍然在正常執行)。
因為已經完成的map工作丟失了(由於相關的map worker被殺掉了),需要重新再作,所以worker死掉會導致一個負數的輸入速率。相關map任務的重新執行很快就重新執行了。整個計算過程在933秒內完成,包括了前邊的啟動時間(只比正常執行時間多了5%的時間)。
6 經驗
我們在2003年的2月寫了MapReduce庫的第一個版本,並且在2003年的8月做了顯著的增強,包括位置優化,worker機器間任務執行的動態負載均衡,等等。從那個時候起,我們驚奇的發現MapReduce函式庫廣泛用於我們日常處理的問題。它現在在Google內部各個領域內廣泛應用,包括:
- 大規模機器學習問題
- Google News和Froogle產品的機器問題.
- 提取資料產生一個流行查詢的報告(例如,GoogleZeitgeist).
- 為新的試驗和產品提取網頁的屬性(例如,從一個web頁的大集合中提取位置資訊用在位置查詢).
- 大規模的圖計算.
圖4顯示了我們主要的原始碼管理系統中,隨著時間推移,MapReduce程式的顯著增加,從2003年早先時候的0個增長到2004年9月份的差不多900個不同的程式。MapReduce之所以這樣的成功,是因為他能夠在不到半小時時間內寫出一個簡單的能夠應用於上千臺機器的大規模併發程式,並且極大的提高了開發和原形設計的週期效率。並且,他可以讓一個完全沒有分散式和/或並行系統經驗的程式設計師,能夠很容易的利用大量的資源。
在每一個任務結束的時候,MapReduce函式庫記錄使用的計算資源的統計資訊。在表1裡,我們列出了2004年8月份在Google執行的一些MapReduce的工作的統計資訊.
6.1 大規模索引
到目前為止,最成功的MapReduce的應用就是重寫了Google web 搜尋服務所使用到的index系統。索引系統處理爬蟲系統抓回來的超大量的文件集,這些文件集儲存在GFS檔案裡。這些文件的原始內容的大小超過了20TB。索引程式是通過一系列的大概5到10次MapReduce操作來建立索引。通過利用MapReduce(替換掉上一個版本的特別設計的分佈處理的索引程式版本)有這樣一些好處:
索引的程式碼簡單,量少,容易理解,因為容錯、分散式、並行處理都隱藏在MapReduce庫中了。例如,當使用MapReduce函式庫的時候,計算的程式碼行數從原來的3800行C++程式碼一下減少到大概700行程式碼。
MapReduce的函式庫的效能已經非常好,所以我們可以把概念上不相關的計算步驟分開處理,而不是混在一起以期減少在資料上的處理。這使得改變索引過程很容易。例如,我們對老索引系統的一個小更改可能要好幾個月的時間,但是在新系統內,只需要花幾天時間就可以了。
索引系統的操作更容易了,這是因為機器的失效,速度慢的機器以及網路失效都已經由MapReduce自己解決了,而不需要操作人員的互動。另外,我們可以簡單的通過對索引系統增加機器的方式提高處理效能。
7 相關工作
很多系統都提供了嚴格的設計模式,並且通過對程式設計的嚴格限制來實現自動的平行計算。例如,一個結合函式可以通過N個元素的陣列的字首在N個處理器上使用並行字首計算在log N的時間內計算完。MapReduce是基於我們的大型現實計算的經驗,對這些模型的一個簡化和精煉。並且,我們還提供了基於上千臺處理器的容錯實現。而大部分併發處理系統都只在小規模的尺度上實現,並且機器的容錯還是程式設計師來控制的。
Bulk Synchronous Programming以及一些MPI primitives提供了更高級別的抽象,可以更容易寫出並行處理的程式。這些系統和MapReduce系統的不同之處在於MapReduce利用嚴格的程式設計模式自動實現使用者程式的併發處理,並且提供了透明的容錯處理。
我們本地的優化策略是受active disks等技術的啟發,在active disks中計算任務是儘量推送到靠近本地磁碟的處理單元上,這樣就減少了通過I/O子系統或網路的資料量。我們在少量磁碟直接連線到普通處理機執行,來代替直接連線到磁碟控制器的處理機上,但是一般的步驟是相似的。
我們的備用任務的機制和在Charlotte系統上的積極排程機制相似。這個簡單的積極排程的一個缺陷是,如果一個任務引起了一個重複性的失敗,那個整個計算將無法完成。我們通過在故障情況下跳過故障記錄的機制,在某種程度上解決了這個問題。
MapReduce實現依賴一個內建的機群管理系統來在一個大規模共享機器組上分佈和執行使用者任務。雖然這個不是本論文的重點,但是叢集管理系統在理念上和Condor等其他系統是一樣的。
在MapReduce庫中的排序工具在操作上和NOW-Sort相似。源機器(map worker)分割將要被排序的資料,然後把它傳送到R個reduce worker中的一個上。每個reduce worker來本地排序它的資料(如果可能,就在記憶體中)。當然,NOW-Sort沒有使用者自定義的map和reduce函式,使得我們的庫可以廣泛的應用。
River提供一個程式設計模型,在這個模型下,處理程序可以靠在分散式的佇列上傳送資料進行彼此通訊。和MapReduce一樣,River系統嘗試提供對不同應用有近似平均的效能,即使在不對等的硬體環境下或者在系統顛簸的情況下也能提供近似平均的效能。River是通過精心排程硬碟和網路的通訊,來平衡任務的完成時間。MapReduce不和它不同。利用嚴格程式設計模型,MapReduce構架來把問題分割成大量的任務。這些任務被自動的在可用的worker上排程,以便速度快的worker可以處理更多的任務。這個嚴格程式設計模型也讓我們可以在工作快要結束的時候安排冗餘的執行,來在非一致處理的情況減少完成時間(比如,在有慢機或者阻塞的worker的時候)。
BAD-FS是一個很MapReduce完全不同的程式設計模型,它的目標是在一個廣闊的網路上執行工作。然而,它們有兩個基本原理是相同的。(1)這兩個系統使用冗餘的執行來從由失效引起的資料丟失中恢復。(2)這兩個系統使用本地化排程策略來減少通過擁擠的網路連線傳送的資料數量。
TACC是一個被設計用來簡化高有效性網路服務結構的系統。和MapReduce一樣,它通過再次執行來實現容錯。
8 結束語
MapReduce程式設計模型已經在Google成功的用在不同的目的。我們把這個成功歸於以下幾個原因:第一,這個模型使用簡單,甚至對沒有並行和分散式經驗的程式設計師也是如此,因為它隱藏了並行化、容錯、位置優化和負載均衡的細節。第二,大量不同的問題可以用MapReduce計算來表達。例如,MapReduce被用來為Google的產品web搜尋服務、排序、資料探勘、機器學習和其他許多系統產生資料。第三,我們已經在一個好幾千臺計算機的大型叢集上開發實現了這個MapReduce。這個實現使得對於這些機器資源的利用非常簡單,因此也適用於解決Google遇到的其他很多需要大量計算的問題。
從這個工作中我們也學習到了一些東西。首先,嚴格的程式設計模型使得並行化和分散式計算簡單,並且也易於構造這樣的容錯計算環境。第二,網路頻寬是系統的瓶頸。因此在我們的系統中大量的優化目標是減少通過網路傳送的資料量,本地優化使用我們從本地磁碟讀取資料,並且把中間資料寫到本地磁碟,以保留網路頻寬。第三,冗餘的執行可以用來減少速度慢的機器的影響和控制機器失效和資料丟失。
感謝
Josh Levenberg校定和擴充套件了使用者級別的MapReduce API,並且結合他的適用經驗和其他人的改進建議,增加了很多新的功能。MapReduce從GFS中讀取和寫入資料。我們要感謝Mohit Aron、Howard Gobioff、Markus Gutschke、David Krame、Shun-Tak Leung和Josh Redstone,他們在開發GFS中的工作。我們還感謝Percy Liang Olcan Sercinoglu 在開發用於MapReduce的叢集管理系統得工作。Mike Burrows、Wilson Hsieh、Josh Levenberg、Sharon Perl、RobPike,Debby Wallach為本論文提出了寶貴的意見。OSDI的無名審閱者以及我們的稽核者Eric Brewer,在論文應當如何改進方面給出了有益的意見。最後,我們感謝Google的工程部的所有MapReduce的使用者,感謝他們提供了有用的反饋、建議以及錯誤報告等等。
A 單詞頻率統計
本節包含了一個完整的程式,用於統計在一組命令列指定的輸入檔案中,每一個不同的單詞出現頻率。
#include "mapreduce/mapreduce.h"
//使用者map函式
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
//跳過前導空格
while ((i < n) && isspace(text[i]))
i++;
// 查詢單詞的結束位置
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
//使用者的reduce函式
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
//迭代具有相同key的所有條目,並且累加它們的value
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
//提交這個輸入key的綜合
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// 把輸入檔案列表存入"spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
//指定輸出檔案:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// 可選操作:在map任務中做部分累加工作,以便節省頻寬
out->set_combiner_class("Adder");
// 調整引數: 使用2000臺機器,每個任務100MB記憶體
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// 執行它
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// 完成: 'result'結構包含計數、花費時間和使用機器的資訊
return 0;
}