1. 程式人生 > 其它 >Mit6.824 Lab1 MapReduce實現

Mit6.824 Lab1 MapReduce實現

paper地址:http://nil.csail.mit.edu/6.824/2021/schedule.html

MapReduce 原理

  1. 啟動MapReduce, 將輸入檔案切分成大小在16-64MB之間的檔案。然後在一組多個機器上啟動使用者程式
  2. 其中一個副本將成為master, 餘下成為worker. master給worker指定任務(M個map任務,R個reduce任務)。master選擇空閒的worker給予map或reduce任務
  3. Map worker 接收切分後的input,執行Map函式,將結果快取到記憶體
  4. 快取後的中間結果會週期性的寫到本地磁碟,並切分成R份(reducer數量)。R個檔案的位置會發送給master, master轉發給reducer
  5. Reduce worker 收到中間檔案的位置資訊,通過RPC讀取。讀取完先根據中間<k, v>排序,然後按照key分組、合併。
  6. Reduce worker在排序後的資料上迭代,將中間<k, v> 交給reduce 函式處理。最終結果寫給對應的output檔案(分片)
  7. 所有map和reduce任務結束後,master喚醒使用者程式

MapReduce 實現流程

Master

論文提到每個(Map或者Reduce)Task有分為idle, in-progress, completed 三種狀態。

// 列舉,表示任務執行階段,根據論文,分為空閒、執行中、已完成
const (
	Idle MasterTaskStatus = iota
	InProgress
	Completed
)

Master 儲存Task的資訊

// Master記錄的任務資訊,包含任務執行階段、任務開始時間,Task物件的指標
type MasterTask struct {
	TaskStatus    MasterTaskStatus // 任務執行階段
	StartTime     time.Time        // 任務開始執行時間
	TaskReference *Task            // 表示當前執行的是哪個任務
}

Master儲存Map任務產生的R箇中間檔案的資訊。

// Master節點物件
type Master struct {
	TaskQueue     chan *Task          // 儲存Task的佇列,通過channel通道實現佇列
	TaskMeta      map[int]*MasterTask // 當前系統所有task的資訊,key為taskId
	MasterPhase   State               // Master階段
	NReduce       int                 // R個Reduce工作執行緒
	InputFiles    []string            // 輸入檔名
	Intermediates [][]string          // M行R列的二維陣列,儲存Map任務產生的M*R箇中間檔案
}

Map和Reduce使用同一個Task結構,完全可以兼顧兩個階段的任務。

// 任務物件
type Task struct {
	Input         string   // 任務負責處理的輸入檔名
	TaskState     State    // 任務狀態
	NReducer      int      // R個Reducer
	TaskNumber    int      // TaskId
	Intermediates []string // 儲存Map任務產生的R箇中間檔案的磁碟路徑
	Output        string   // 輸出檔名
}

將task和master的狀態合併成一個State

type State int
// 列舉,表示Master和Task的狀態
const (
	Map State = iota  // 從0開始列舉
	Reduce
	Exit
	Wait
)

MapReduce執行Map和Reduce實現

1. 啟動master

// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
// 建立Master節點,負責分發任務,作為服務註冊中心、服務排程中心
func MakeMaster(files []string, nReduce int) *Master {
	// 建立Master節點
	m := Master{
		// 儲存task的佇列,通過chan通道實現先進先出
		TaskQueue: make(chan *Task, max(nReduce, len(files))),
		// 主要作用是通過taskId這個key獲取到對應的Task資訊
		TaskMeta: make(map[int]*MasterTask),
		// 一開始Master和Task都處於Map階段
		MasterPhase: Map,
		NReduce:     nReduce,
		InputFiles:  files,
		// 建立二維陣列儲存Map階段生成的中間檔案路徑,設定列數為nReduce
		Intermediates: make([][]string, nReduce),
	}
	// TODO 將files中的檔案切分成16MB-64MB的檔案

	// 建立Map任務
	m.createMapTask()
	// 啟動Master節點,將Master的方法都註冊到註冊中心,worker就可以通過RPC訪問Master的方法
	m.server()
	// crash,啟動一個協程來不斷檢查超時的任務
	go m.catchTimeOut()
	return &m
}

建立Map任務

// 建立Map任務
func (m *Master) createMapTask() {
	// 遍歷所有的輸入檔案,每個檔案用一個Map任務處理
	for idx, fileName := range m.InputFiles {
		// 建立Map Task物件
		taskMeta := Task{
			Input:      fileName,
			TaskState:  Map,
			NReducer:   m.NReduce,
			TaskNumber: idx,
		}
		// Task物件放入佇列
		m.TaskQueue <- &taskMeta
		// 填充Master對當前佇列中所有Task的資訊, taskId為key,value儲存task資訊
		m.TaskMeta[idx] = &MasterTask{
			TaskStatus:    Idle,
			TaskReference: &taskMeta,
		}
	}
}

不斷檢查超時任務,提高執行效率

// crash,啟動一個協程來不斷檢查超時的任務
func (m *Master) catchTimeOut() {
	for {
		time.Sleep(5 * time.Second)
		// 鎖住其他執行緒可能會使用的m.MasterPhase
		mu.Lock()
		// Master節點的執行狀態是退出狀態,則退出檢查
		if m.MasterPhase == Exit {
			mu.Unlock()
			return
		}
		// 檢查所有任務
		for _, masterTask := range m.TaskMeta {
			// 任務執行中並且執行時間大於10秒,則重新放入佇列等待被其他worker執行
			if masterTask.TaskStatus == InProgress && time.Now().Sub(masterTask.StartTime) > 10*time.Second {
				m.TaskQueue <- masterTask.TaskReference
				masterTask.TaskStatus = Idle
			}
		}
		mu.Unlock()
	}
}

2. master監聽worker RPC呼叫,分配任務

// 等待worker通過rpc請求Master的服務
func (m *Master) AssignTask(args *ExampleArgs, reply *Task) error {
	// 鎖住Master節點
	mu.Lock()
	defer mu.Unlock()
	// 佇列裡還有空閒任務
	if len(m.TaskQueue) > 0 {
		// taskQueue還有空閒的task就發出一個Task指標給一個worker
		*reply = *<-m.TaskQueue
		// 設定Task狀態
		m.TaskMeta[reply.TaskNumber].TaskStatus = InProgress
		m.TaskMeta[reply.TaskNumber].StartTime = time.Now()
	} else if m.MasterPhase == Exit {
		// 佇列裡還有任務但是Master狀態為Exit
		// 返回一個帶著Exit狀態的Task,表示Master已經終止服務了
		*reply = Task{
			TaskState: Exit,
		}
	} else {
		// 佇列裡沒有任務,則讓請求的worker等待
		*reply = Task{
			TaskState: Wait,
		}
	}
	return nil
}

3. 啟動worker

// main/mrworker.go calls this function.
// 啟動Worker
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
	for {
		// 通過RPC獲取空閒任務
		task := getTask()
		// 根據任務當前的執行狀態進行相應處理
		switch task.TaskState {
		case Map:
			mapper(&task, mapf)
		case Reduce:
			reducer(&task, reducef)
		case Wait:
			time.Sleep(5 * time.Second)
		case Exit:
			return
		}
	}
}

4. worker向master傳送RPC請求任務

// 通過RPC獲取空閒任務
func getTask() Task {
	args := ExampleArgs{}
	reply := Task{}
	// RPC請求呼叫Master的服務來獲取Task
	call("Master.AssignTask", &args, &reply)
	return reply
}

5. worker獲得MapTask,交給mapper處理

// 執行Map任務
func mapper(task *Task, mapf func(string, string) []KeyValue) {
	// 獲取任務對應的檔案路徑
	content, err := ioutil.ReadFile(task.Input)
	if err != nil {
		log.Fatal("Failed to read file: "+task.Input, err)
	}
	// 執行wc.go中的mapf方法,進行MapReduce的map階段,得到nReduce箇中間檔案路徑的字串陣列
	intermediates := mapf(task.Input, string(content))
	// 將map階段生成的中間檔案路徑儲存到列數為NReducer的二維陣列中
	buffer := make([][]KeyValue, task.NReducer)
	// 儲存結果到記憶體buffer中
	for _, intermediate := range intermediates {
		// 根據key進行hash,將結果切分成NReducer份
		slot := ihash(intermediate.Key) % task.NReducer
		buffer[slot] = append(buffer[slot], intermediate)
	}
	// 週期性地從記憶體儲存到磁碟中
	mapOutput := make([]string, 0)
	for i := 0; i < task.NReducer; i++ {
		// 將中間結果寫入到NReducer箇中間臨時檔案中
		mapOutput = append(mapOutput, writeToLocalFile(task.TaskNumber, i, &buffer[i]))
	}
	// NReducer個檔案的路徑儲存到記憶體,Master就可以獲取到
	task.Intermediates = mapOutput
	// 設定該任務狀態為已完成
	TaskCompleted(task)
}

6. worker任務完成後通知master

func TaskCompleted(task *Task) {
	reply := ExampleReply{}
	call("Master.TaskCompleted", task, &reply)
}

7. master收到完成後的Task

// 更新Task狀態為已完成並檢查
func (m *Master) TaskCompleted(task *Task, reply *ExampleReply) error {
	mu.Lock()
	defer mu.Unlock()
	// 容錯、檢查節點狀態、檢查重複任務
	if task.TaskState != m.MasterPhase || m.TaskMeta[task.TaskNumber].TaskStatus == Completed {
		// 重複任務要丟棄
		return nil
	}
	m.TaskMeta[task.TaskNumber].TaskStatus = Completed
	go m.processTaskResult(task)
	return nil
}
  • 如果所有的ReduceTask都已經完成,轉入Exit階段
// master通過協程獲取任務執行的結果
func (m *Master) processTaskResult(task *Task) {
	mu.Lock()
	defer mu.Unlock()
	switch task.TaskState {
	case Map:
		// Map階段則收集中間結果到Master記憶體中
		// key為taskId,value為檔案路徑的字串陣列,一個task有NReducer個filePath
		for reduceTaskId, filePath := range task.Intermediates {
			m.Intermediates[reduceTaskId] = append(m.Intermediates[reduceTaskId], filePath)
		}
		// 所有任務已完成則進入reduce階段
		if m.allTaskDone() {
			m.createReduceTask()
			m.MasterPhase = Reduce
		}
	case Reduce:
		// Reduce則設定狀態為Exit
		if m.allTaskDone() {
			m.MasterPhase = Exit
		}
	}
}

8. 如果所有的MapTask都已經完成,建立ReduceTask,轉入Reduce階段

// 執行Reduce任務
func reducer(task *Task, reducef func(string, []string) string) {
	// 從磁碟中讀取中間檔案
	intermediate := *readFromLocalFile(task.Intermediates)
	// 根據key進行字典序排序
	sort.Sort(ByKey(intermediate))

	dir, _ := os.Getwd()
	tempFile, err := ioutil.TempFile(dir, "mr-2021-tmp-*")
	if err != nil {
		log.Fatal("Failed to create temp file", err)
	}
	i := 0
	// 遍歷每一個key
	for i < len(intermediate) {
		j := i + 1
		// 相同的key分組合並
		for j < len(intermediate) && intermediate[i].Key == intermediate[j].Key {
			j++
		}
		// 儲存該key的最終計數, 即對相同key的計數進行合併統計
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		// 結果交給reducef進行統計
		output := reducef(intermediate[i].Key, values)
		// 最終結果的字串內容儲存到臨時檔案裡
		fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
		i = j
	}
	tempFile.Close()
	// 定義輸出檔案的檔名
	oname := fmt.Sprintf("mr-2021-out-%d", task.TaskNumber)
	os.Rename(tempFile.Name(), oname)
	task.Output = oname
	TaskCompleted(task)
}

9. master確認所有ReduceTask都已經完成,轉入Exit階段,終止所有master和worker goroutine

//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
	mu.Lock()
	defer mu.Unlock()
	ret := m.MasterPhase == Exit
	return ret
}
  1. 併發

因為Master儲存Task相關的資訊,因此在worker執行任務時,是需要對Master進行併發修改的,所以需要進行上鎖。master跟多個worker通訊,master的資料是共享的。

// Master節點物件
type Master struct {
	TaskQueue     chan *Task          // 儲存Task的佇列,通過channel通道實現佇列
	TaskMeta      map[int]*MasterTask // 當前系統所有task的資訊,key為taskId
	MasterPhase   State               // Master階段
	NReduce       int                 // R個Reduce工作執行緒
	InputFiles    []string            // 輸入檔名
	Intermediates [][]string          // M行R列的二維陣列,儲存Map任務產生的M*R箇中間檔案
}

其中TaskMeta, Phase, Intermediates, TaskQueue 都有讀寫發生。TaskQueue使用channel實現,自己帶鎖。只有涉及Intermediates, TaskMeta, Phase的操作需要上鎖,InputFiles 和 NReduce 因為是在建立Master時一次性寫入,所以不會出現併發寫的場景。

11.容錯

  1. 週期性向worker傳送心跳檢測
  • 如果worker失聯一段時間,master將worker標記成failed
  • worker失效之後,已完成的map task被重新標記為idle,已完成的reduce task不需要改變
  1. 對於in-progress 且超時的任務,則重新放入佇列等待被其他worker執行
// crash,啟動一個協程來不斷檢查超時的任務
func (m *Master) catchTimeOut() {
	for {
		time.Sleep(5 * time.Second)
		// 鎖住其他執行緒可能會使用的m.MasterPhase
		mu.Lock()
		// Master節點的執行狀態是退出狀態,則退出檢查
		if m.MasterPhase == Exit {
			mu.Unlock()
			return
		}
		// 檢查所有任務
		for _, masterTask := range m.TaskMeta {
			// 任務執行中並且執行時間大於10秒,則重新放入佇列等待被其他worker執行
			if masterTask.TaskStatus == InProgress && time.Now().Sub(masterTask.StartTime) > 10*time.Second {
				m.TaskQueue <- masterTask.TaskReference
				masterTask.TaskStatus = Idle
			}
		}
		mu.Unlock()
	}
}