Mit6.824 Lab1 MapReduce實現
paper地址:http://nil.csail.mit.edu/6.824/2021/schedule.html
MapReduce 原理
- 啟動MapReduce, 將輸入檔案切分成大小在16-64MB之間的檔案。然後在一組多個機器上啟動使用者程式
- 其中一個副本將成為master, 餘下成為worker. master給worker指定任務(M個map任務,R個reduce任務)。master選擇空閒的worker給予map或reduce任務
- Map worker 接收切分後的input,執行Map函式,將結果快取到記憶體
- 快取後的中間結果會週期性的寫到本地磁碟,並切分成R份(reducer數量)。R個檔案的位置會發送給master, master轉發給reducer
- Reduce worker 收到中間檔案的位置資訊,通過RPC讀取。讀取完先根據中間<k, v>排序,然後按照key分組、合併。
- Reduce worker在排序後的資料上迭代,將中間<k, v> 交給reduce 函式處理。最終結果寫給對應的output檔案(分片)
- 所有map和reduce任務結束後,master喚醒使用者程式
MapReduce 實現流程
Master
論文提到每個(Map或者Reduce)Task有分為idle, in-progress, completed 三種狀態。
// 列舉,表示任務執行階段,根據論文,分為空閒、執行中、已完成 const ( Idle MasterTaskStatus = iota InProgress Completed )
Master 儲存Task的資訊
// Master記錄的任務資訊,包含任務執行階段、任務開始時間,Task物件的指標
type MasterTask struct {
TaskStatus MasterTaskStatus // 任務執行階段
StartTime time.Time // 任務開始執行時間
TaskReference *Task // 表示當前執行的是哪個任務
}
Master儲存Map任務產生的R箇中間檔案的資訊。
// Master節點物件 type Master struct { TaskQueue chan *Task // 儲存Task的佇列,通過channel通道實現佇列 TaskMeta map[int]*MasterTask // 當前系統所有task的資訊,key為taskId MasterPhase State // Master階段 NReduce int // R個Reduce工作執行緒 InputFiles []string // 輸入檔名 Intermediates [][]string // M行R列的二維陣列,儲存Map任務產生的M*R箇中間檔案 }
Map和Reduce使用同一個Task結構,完全可以兼顧兩個階段的任務。
// 任務物件
type Task struct {
Input string // 任務負責處理的輸入檔名
TaskState State // 任務狀態
NReducer int // R個Reducer
TaskNumber int // TaskId
Intermediates []string // 儲存Map任務產生的R箇中間檔案的磁碟路徑
Output string // 輸出檔名
}
將task和master的狀態合併成一個State
type State int
// 列舉,表示Master和Task的狀態
const (
Map State = iota // 從0開始列舉
Reduce
Exit
Wait
)
MapReduce執行Map和Reduce實現
1. 啟動master
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
// 建立Master節點,負責分發任務,作為服務註冊中心、服務排程中心
func MakeMaster(files []string, nReduce int) *Master {
// 建立Master節點
m := Master{
// 儲存task的佇列,通過chan通道實現先進先出
TaskQueue: make(chan *Task, max(nReduce, len(files))),
// 主要作用是通過taskId這個key獲取到對應的Task資訊
TaskMeta: make(map[int]*MasterTask),
// 一開始Master和Task都處於Map階段
MasterPhase: Map,
NReduce: nReduce,
InputFiles: files,
// 建立二維陣列儲存Map階段生成的中間檔案路徑,設定列數為nReduce
Intermediates: make([][]string, nReduce),
}
// TODO 將files中的檔案切分成16MB-64MB的檔案
// 建立Map任務
m.createMapTask()
// 啟動Master節點,將Master的方法都註冊到註冊中心,worker就可以通過RPC訪問Master的方法
m.server()
// crash,啟動一個協程來不斷檢查超時的任務
go m.catchTimeOut()
return &m
}
建立Map任務
// 建立Map任務
func (m *Master) createMapTask() {
// 遍歷所有的輸入檔案,每個檔案用一個Map任務處理
for idx, fileName := range m.InputFiles {
// 建立Map Task物件
taskMeta := Task{
Input: fileName,
TaskState: Map,
NReducer: m.NReduce,
TaskNumber: idx,
}
// Task物件放入佇列
m.TaskQueue <- &taskMeta
// 填充Master對當前佇列中所有Task的資訊, taskId為key,value儲存task資訊
m.TaskMeta[idx] = &MasterTask{
TaskStatus: Idle,
TaskReference: &taskMeta,
}
}
}
不斷檢查超時任務,提高執行效率
// crash,啟動一個協程來不斷檢查超時的任務
func (m *Master) catchTimeOut() {
for {
time.Sleep(5 * time.Second)
// 鎖住其他執行緒可能會使用的m.MasterPhase
mu.Lock()
// Master節點的執行狀態是退出狀態,則退出檢查
if m.MasterPhase == Exit {
mu.Unlock()
return
}
// 檢查所有任務
for _, masterTask := range m.TaskMeta {
// 任務執行中並且執行時間大於10秒,則重新放入佇列等待被其他worker執行
if masterTask.TaskStatus == InProgress && time.Now().Sub(masterTask.StartTime) > 10*time.Second {
m.TaskQueue <- masterTask.TaskReference
masterTask.TaskStatus = Idle
}
}
mu.Unlock()
}
}
2. master監聽worker RPC呼叫,分配任務
// 等待worker通過rpc請求Master的服務
func (m *Master) AssignTask(args *ExampleArgs, reply *Task) error {
// 鎖住Master節點
mu.Lock()
defer mu.Unlock()
// 佇列裡還有空閒任務
if len(m.TaskQueue) > 0 {
// taskQueue還有空閒的task就發出一個Task指標給一個worker
*reply = *<-m.TaskQueue
// 設定Task狀態
m.TaskMeta[reply.TaskNumber].TaskStatus = InProgress
m.TaskMeta[reply.TaskNumber].StartTime = time.Now()
} else if m.MasterPhase == Exit {
// 佇列裡還有任務但是Master狀態為Exit
// 返回一個帶著Exit狀態的Task,表示Master已經終止服務了
*reply = Task{
TaskState: Exit,
}
} else {
// 佇列裡沒有任務,則讓請求的worker等待
*reply = Task{
TaskState: Wait,
}
}
return nil
}
3. 啟動worker
// main/mrworker.go calls this function.
// 啟動Worker
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
// 通過RPC獲取空閒任務
task := getTask()
// 根據任務當前的執行狀態進行相應處理
switch task.TaskState {
case Map:
mapper(&task, mapf)
case Reduce:
reducer(&task, reducef)
case Wait:
time.Sleep(5 * time.Second)
case Exit:
return
}
}
}
4. worker向master傳送RPC請求任務
// 通過RPC獲取空閒任務
func getTask() Task {
args := ExampleArgs{}
reply := Task{}
// RPC請求呼叫Master的服務來獲取Task
call("Master.AssignTask", &args, &reply)
return reply
}
5. worker獲得MapTask,交給mapper處理
// 執行Map任務
func mapper(task *Task, mapf func(string, string) []KeyValue) {
// 獲取任務對應的檔案路徑
content, err := ioutil.ReadFile(task.Input)
if err != nil {
log.Fatal("Failed to read file: "+task.Input, err)
}
// 執行wc.go中的mapf方法,進行MapReduce的map階段,得到nReduce箇中間檔案路徑的字串陣列
intermediates := mapf(task.Input, string(content))
// 將map階段生成的中間檔案路徑儲存到列數為NReducer的二維陣列中
buffer := make([][]KeyValue, task.NReducer)
// 儲存結果到記憶體buffer中
for _, intermediate := range intermediates {
// 根據key進行hash,將結果切分成NReducer份
slot := ihash(intermediate.Key) % task.NReducer
buffer[slot] = append(buffer[slot], intermediate)
}
// 週期性地從記憶體儲存到磁碟中
mapOutput := make([]string, 0)
for i := 0; i < task.NReducer; i++ {
// 將中間結果寫入到NReducer箇中間臨時檔案中
mapOutput = append(mapOutput, writeToLocalFile(task.TaskNumber, i, &buffer[i]))
}
// NReducer個檔案的路徑儲存到記憶體,Master就可以獲取到
task.Intermediates = mapOutput
// 設定該任務狀態為已完成
TaskCompleted(task)
}
6. worker任務完成後通知master
func TaskCompleted(task *Task) {
reply := ExampleReply{}
call("Master.TaskCompleted", task, &reply)
}
7. master收到完成後的Task
// 更新Task狀態為已完成並檢查
func (m *Master) TaskCompleted(task *Task, reply *ExampleReply) error {
mu.Lock()
defer mu.Unlock()
// 容錯、檢查節點狀態、檢查重複任務
if task.TaskState != m.MasterPhase || m.TaskMeta[task.TaskNumber].TaskStatus == Completed {
// 重複任務要丟棄
return nil
}
m.TaskMeta[task.TaskNumber].TaskStatus = Completed
go m.processTaskResult(task)
return nil
}
- 如果所有的ReduceTask都已經完成,轉入Exit階段
// master通過協程獲取任務執行的結果
func (m *Master) processTaskResult(task *Task) {
mu.Lock()
defer mu.Unlock()
switch task.TaskState {
case Map:
// Map階段則收集中間結果到Master記憶體中
// key為taskId,value為檔案路徑的字串陣列,一個task有NReducer個filePath
for reduceTaskId, filePath := range task.Intermediates {
m.Intermediates[reduceTaskId] = append(m.Intermediates[reduceTaskId], filePath)
}
// 所有任務已完成則進入reduce階段
if m.allTaskDone() {
m.createReduceTask()
m.MasterPhase = Reduce
}
case Reduce:
// Reduce則設定狀態為Exit
if m.allTaskDone() {
m.MasterPhase = Exit
}
}
}
8. 如果所有的MapTask都已經完成,建立ReduceTask,轉入Reduce階段
// 執行Reduce任務
func reducer(task *Task, reducef func(string, []string) string) {
// 從磁碟中讀取中間檔案
intermediate := *readFromLocalFile(task.Intermediates)
// 根據key進行字典序排序
sort.Sort(ByKey(intermediate))
dir, _ := os.Getwd()
tempFile, err := ioutil.TempFile(dir, "mr-2021-tmp-*")
if err != nil {
log.Fatal("Failed to create temp file", err)
}
i := 0
// 遍歷每一個key
for i < len(intermediate) {
j := i + 1
// 相同的key分組合並
for j < len(intermediate) && intermediate[i].Key == intermediate[j].Key {
j++
}
// 儲存該key的最終計數, 即對相同key的計數進行合併統計
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
// 結果交給reducef進行統計
output := reducef(intermediate[i].Key, values)
// 最終結果的字串內容儲存到臨時檔案裡
fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
i = j
}
tempFile.Close()
// 定義輸出檔案的檔名
oname := fmt.Sprintf("mr-2021-out-%d", task.TaskNumber)
os.Rename(tempFile.Name(), oname)
task.Output = oname
TaskCompleted(task)
}
9. master確認所有ReduceTask都已經完成,轉入Exit階段,終止所有master和worker goroutine
//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
mu.Lock()
defer mu.Unlock()
ret := m.MasterPhase == Exit
return ret
}
- 併發
因為Master儲存Task相關的資訊,因此在worker執行任務時,是需要對Master進行併發修改的,所以需要進行上鎖。master跟多個worker通訊,master的資料是共享的。
// Master節點物件
type Master struct {
TaskQueue chan *Task // 儲存Task的佇列,通過channel通道實現佇列
TaskMeta map[int]*MasterTask // 當前系統所有task的資訊,key為taskId
MasterPhase State // Master階段
NReduce int // R個Reduce工作執行緒
InputFiles []string // 輸入檔名
Intermediates [][]string // M行R列的二維陣列,儲存Map任務產生的M*R箇中間檔案
}
其中TaskMeta, Phase, Intermediates, TaskQueue
都有讀寫發生。TaskQueue
使用channel
實現,自己帶鎖。只有涉及Intermediates, TaskMeta, Phase
的操作需要上鎖,InputFiles 和 NReduce 因為是在建立Master時一次性寫入,所以不會出現併發寫的場景。
11.容錯
- 週期性向worker傳送心跳檢測
- 如果worker失聯一段時間,master將worker標記成failed
- worker失效之後,已完成的map task被重新標記為idle,已完成的reduce task不需要改變
- 對於in-progress 且超時的任務,則重新放入佇列等待被其他worker執行
// crash,啟動一個協程來不斷檢查超時的任務
func (m *Master) catchTimeOut() {
for {
time.Sleep(5 * time.Second)
// 鎖住其他執行緒可能會使用的m.MasterPhase
mu.Lock()
// Master節點的執行狀態是退出狀態,則退出檢查
if m.MasterPhase == Exit {
mu.Unlock()
return
}
// 檢查所有任務
for _, masterTask := range m.TaskMeta {
// 任務執行中並且執行時間大於10秒,則重新放入佇列等待被其他worker執行
if masterTask.TaskStatus == InProgress && time.Now().Sub(masterTask.StartTime) > 10*time.Second {
m.TaskQueue <- masterTask.TaskReference
masterTask.TaskStatus = Idle
}
}
mu.Unlock()
}
}