MapReduce實現
本文是mit6.824的lab1實現記錄,目前已通過所有test,但終端會有EOF報錯,暫時還未排查到錯誤。
基本架構
MapReduce模型角色有兩種:master和worker。master是map和reduce任務的排程者,起碼任務排程和請求處理是非同步的,所以要考慮多執行緒。worker是請求並處理單個任務,是單執行緒。
Master
主要工作如下:
- 註冊worker
- 初始化任務陣列,map階段初始化map任務陣列,reduce階段可以理解為重置為reduce陣列。
- 分派任務
- 任務排程,定時進行,需要檢查所有任務的完成狀況,以確定是否進入下一階段。
Task
任務有五種狀態:Ready,Queue,Running,Completed和Err,通過判斷狀態我們進行不同的處理。Ready是在初始化完成後預設的,需要加入通道並更新狀態為Queue;當請求來臨時,我們派發任務(注意擴充任必要資訊)並更新狀態為Running;根據worker返回的任務報告或超時判斷更新狀態為Completed或者Err;對於Err任務更改狀態為Queue,並加入通道。
注:需要任務陣列的原因:根據下標跟蹤任務狀態
Schedule方法
這裡進行我們任務排程的大部分工作,排程策略是:遍歷任務陣列並更新狀態,按照Task中的思路。同時判斷是否進行任務重置。
Worker
worker一經啟動就不停的去請求並執行任務,所以需要對map和reduce任務分別處理,這裡我們的master返回的結構體是一樣的,按需取用即可。
呼叫Worker方法,首先我們傳送Worker註冊請求,序列號遞增(這裡不用加鎖)。然後就是不斷迴圈去請求並執行任務。另外,我們需要儲存一個isAlive變數,判斷worker是否可以退出。
我們的CallReqTask方法會得到三種響應,Map,Reduce和Over,在Master完成ReducePahse階段後,會將之後的請求任務型別全部置為Over,以便Worker能夠及時退出。
注:map和reduce任務我們需要執行後報告master,但over偽任務只需要更新isAlive狀態即可,我們稍後即可退出。
doMapTask
為了讓每個map任務能夠生成nReduce箇中間檔案,我們make一個二維陣列,然後取模存放:
reduces := wmake([][]KeyValue, task.NReduce)
for _, kv := range kvs {
idx := ihash(kv.Key) % task.NReduce
reduces[idx] = append(reduces[idx], kv)
}
doReduceTask
首先,我們的輸入檔案有NMap個,按照mr-X-Y格式,這裡的Y我們規定為任務序列號,從而能夠讀取正確的檔案。我們定義一個map陣列來存放鍵值對應的序列隊,並使用reducef函式輸出到最終檔案。