1. 程式人生 > 其它 >MIT6.824之MapReduce實現

MIT6.824之MapReduce實現

MIT6.824之MapReduce實現

這篇文章主要是大致分析一下MapReduce的實現,具體程式碼見Github(https://github.com/iloveacm4/mapReduce)

首先,依照MIT的lab已經給出了大致程式碼。

我們的實現主要分為兩個部分,master.go 和 worker.go。另外還定義了用於RPC的結構rpc.go

// Master 結構體
type Master struct {
	// Your definitions here.
	AllFilesName        map[string]int	  //記錄檔名
	MapTaskNumCount     int
	NReduce             int               // 記錄需要生成的Reduce task數量
	InterFIlename       [][]string        // intermediate file
	MapFinished         bool
	ReduceTaskStatus    map[int]int      // 記錄Reduce狀態
	ReduceFinished      bool              // 記錄是否完成reduce,完成後任務結束
	RWLock              *sync.RWMutex
}

//兩個chan用來存放map任務 和 Reduce任務
var maptasks chan string          // chan for map task
var reducetasks chan int          // chan for reduce task
//master.go

//前期準備工作

func MakeMaster(files []string, nReduce int) *Master {} //初始化Master結構體,開啟server服務,會呼叫master.server()

func (m *Master) server() {} //註冊RPC,呼叫generateTask()產生任務,等待worker發來的任務請求

func (m *Master) generateTask() {} //將Master結構體中AllFileName所有檔案載入入 maptasks, 再處理完成後,再載入reduce任務進入reducetasks


// 接受遠端呼叫,為worker分配任務

func (m *Master) MyCallHandler(args *MyArgs, reply *MyReply) error {} //負責接受worker發來的請求,有三種請求MsgForTask,
//一是請求任務,請求map任務以及請求reduce任務,根據不同的請求作不同的處理。需要設定reply為相應的值。並且執行一個守護協程timerForWorker()
//二是請求結束map任務MsgForFinishMap,用來通知master map任務結束。設定AllFileName[filename]為Finished狀態
//三是請求結束reduce任務MsgForFinishReduce。同樣需要設定ReduceTaskStatus[index]為Finished狀態


func (m *Master)timerForWorker(taskType, identify string){} //設定定時器為10秒,10秒內完成了任務,則設定相應的任務為Finished
//否則重新將任務設定為未完成狀態並加入相應的任務佇列

//worker.go

func Worker(mapf func(string, string) []KeyValue, 
            reducef func(string, []string) string) {} //主函式,死迴圈,不斷地呼叫CallForTask()函式請求任務,並根據master分配的
												  //任務型別呼叫mapInWorker(), 或者reduceInWorker()


func CallForTask(msgType int,msgCnt string) MyReply {} // 呼叫RPC,返回請求到的任務


func mapInWorker(reply *MyReply,mapf func(string, string) []KeyValue) {} 
//呼叫mapf做map任務, 並呼叫WriteToJSONFile()寫入JSON檔案中,																   
//呼叫SendInterFiles,表明已處理到中間狀態																  
//並呼叫CallForTask(MsgForFinishMap, reply.Filename)表明結束該任務

func reduceInWorker(reply *MyReply, reducef func(string, []string) string) {} 
//與mapInWorker同理
//統計的原理是按照key排序,然後遍歷統計

func SendInterFiles(msgType int, msgCnt string, nReduceType int) MyReply {}
//傳送中間檔案到master表明位置