1. 程式人生 > >MapReduce論文

MapReduce論文

摘要:使用者確定一個map對映函式來處理一個鍵值對來生成一系列中間的鍵值對,然後一個reduce規約函式來合併所有相同中間key(group)的中間value。
實時系統關係劃分輸入資料的細節,不同機器上排程程式執行,處理機器宕機問題,和管理要求的機器之間的交流。
我們意識到我們大多數的計算包含應用一個map操作到每個邏輯檔案(record),是為了計算一系列的中間的鍵值對,然後應用所有資料(具有相同的key)一個reduce規約操作,是為了合適地結合劃分資料。
2 Programming Model
包括兩個函式:Map, Reduce
Map: 由使用者編寫,輸入對併產生一系列中間的鍵值對。根據相同的中間key來分組所有的中間值,再傳輸給規約函式。
Reduce: 使用者編寫,得到中間鍵key 和一系列對應的值。合併這些值來構成一個可能更小的值集合。每個規約函式期望的輸出值個數為0-1個輸出。輸入到reduce函式的中間值是通過迭代器以此取出。這執行我們處理太大而不能一次讀取到記憶體的列表。
Example:

map(String key, String value):
//key: document name
//value:document contents
for each word w in value:
EmitIntermediate(w, ‘1’);

reduce(String key, Iterator values):
// key : a word
// values : a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map函式對每個詞加上一個關聯的出現次數計數(該例子中就是1)。規約函式對每個相同次發出的計數相加。
map : (k1, v1) -> list ( k2, v2)
reduce: (k2, list(v2)) -> list(v2)
也就是輸入的鍵值對和輸出的鍵值對不是同一領域,並且中間的鍵值對和輸出鍵值對是同一領域。

執行過程:準備工作:輸入部分(首先把資料等分),使用者程式fork出Master執行緒,map 的worker執行緒, reduce的worker執行緒。
接下來,master呼叫map 的worker將部分資料讀入,並生成中間鍵值對,寫入本地硬碟。
再master呼叫reduce的worker 將(遠端)硬碟中的中間鍵值對讀取,得到最終的輸出寫入檔案。

3 Master 資料結構
master節點必須包含一些資訊,例如對於每個map和reduce任務,master節點需要儲存每個worker的狀態(閒置,程序中,完成)和(對於非閒置任務)worker機器的身份。
master節點是通道,中間檔案區域的定位從map任務傳到reduce任務。隱藏對於每個完成的map任務,master節點儲存R箇中間檔案區域的定位和大小。當map任務完成時更新這些檔案定位和大小。這些資訊不斷傳給工作中的reduce任務worker。

總結:Master(works, states(1~w), identifies(1~w), location(1~R), size(1~R)).

3.3 Fault Tolerance
worker宕機
master節點週期性地ping每個worker(心跳機制)。
任何maps 任務由worker完成後都重設為閒置狀態。相似的,任何在處理中的map或者reduce任務在一個宕機的worker上也會重設為閒置狀態。

已完成的map任務出現失敗後需要重新執行,因為其輸出儲存在宕機worker上無法得到。完成的reduce任務不需要重新執行因為其輸出儲存在全域性檔案系統中。

總結: map任務處理結果在本地硬碟,而reduce任務從遠端得到資料,處理完後上傳全域性檔案系統。

Master 宕機
master節點需要週期性記錄master的資料結構的checkpoints。當一個master die後複製該資訊衝上一個checkpoint繼續開始。一般當master節點die後終止mapreduce計算,並重啟操作。(一般設為兩個master節點)

在出現failures時的語義:
當用戶設定的map和reduce操作是確定性的函式,分散式實現產生的結果和由一個無錯誤的序列化執行得到結果相同。
一個reduce任務產生一個私有的臨時檔案,一個map任務產生R個私有檔案。當map任務完成時,發生訊息給master, 其包括R個臨時檔案的名稱。若master接受完成資訊,它會忽略該檔名資訊。否則,master節點會記錄R個檔案的名稱。

當一個reduce任務完成,reduce woker原子性地改名其臨時的輸出檔案為最終的輸出檔案。若相同的reduce任務在多機上執行,多個改名呼叫將為相同的最終輸出檔案執行

3.4 定位
網路頻寬在計算環境中是個非常稀缺資源。我們儲存網路頻寬通過利用一個事實,就是儲存在本地硬碟的輸入資料(由GFS管理)來組成叢集。GFS劃分每個檔案為64Mb,並在不同機器上儲存每個塊的副本(一般為3)。MapReduce的master節點考慮輸入檔案的定位資訊,並試圖排程一個map任務在一臺機器,其包含一個對應輸入資料的副本。若失敗了,則排程該任務輸入資料近鄰的副本的一個map任務(例如,在相同網路中的worker機器,轉化到包含資料的另一個機器)。所以大多數資料讀取都是區域性的,沒有消耗頻寬。

3.5 任務粒度granularity(間隔尺寸)
我們將map節點劃分成M塊,reduce節點劃分成R塊。理想情況,M和R應該是大於worker機器的個數。使這些worker完成許多不同的任務來提升動態載入平衡,也當worker宕機時及時恢復:許多map任務完成時可以傳播到所有worker機器中。
M和R的小在實際中有約束,master節點必須在O(M+R)複雜度中做出排程決策,並保留O(M*R)(很小)個狀態在記憶體中。O(M*R)塊個狀態只有一個位元組,對於每個map/reduce任務對。
進一步,R經常受使用者約束因為每個reduce任務的輸出值都是在分離的輸出檔案裡截止。實踐上,我們試圖選擇M的大小,以便於每個獨立的任務分配大約16M到64M的輸入資料(以便於locality位置優化是最有效的???),我們讓R為一個較小的worker數的乘子。我們經常設定M = 200000, R = 5000, 使用2000個workers機器。

3.6 Backup(阻塞)任務
一個普遍的原因導致總時間增長的Straggler的出現:一個機器用了很長的時間來完成上次少量的map或者reduce任務之一。落伍者