MIT-6.824 lab1
阿新 • • 發佈:2019-02-18
member tar image art merger new con ase ats
github:https://github.com/haoweiz/MIT-6.824
Part1:
第一部分比較簡單,我們只需要修改doMap和doReduce函數即可,主要涉及Go語言對Json文件的讀寫。簡單說說part1的測試流程吧,Sequential部分代碼如下
1 func TestSequentialSingle(t *testing.T) { 2 mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc) 3 mr.Wait() 4 check(t, mr.files) 5 checkWorker(t, mr.stats)6 cleanup(mr) 7 } 8 9 func TestSequentialMany(t *testing.T) { 10 mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc) 11 mr.Wait() 12 check(t, mr.files) 13 checkWorker(t, mr.stats) 14 cleanup(mr)
makeInputs(M int)對於0-100000個數字平均分成了M個文件寫入,,根據題目要求,我們需要將每個文件分成N個文件寫出,因此doMap過程總共產生M*N個文件,我們可以先將文件的所有鍵值對通過mapF函數(本質上是test_test.go中的MapFunc函數)存儲在數組keyvalue中,然後調用getStartEnd函數活動第Number個reduce文件應存儲的keyvalue的切片範圍,利用Encoder寫入文件即可
1 // Get the start and end index of Number if we divide Total keyvalue into nReduce parts 2 func getStartEnd(Number, nReduce, Total int) (start, end int) { 3 part := Total/nReduce 4 if Number == nReduce-1 { 5 start = Number*part 6 end = Total 7 } else { 8 start = Number*part9 end = (Number+1)*part 10 } 11 return 12 } 13 14 // doMap manages one map task: it reads one of the input files 15 // (inFile), calls the user-defined map function (mapF) for that file‘s 16 // contents, and partitions the output into nReduce intermediate files. 17 func doMap( 18 jobName string, // the name of the MapReduce job 19 mapTaskNumber int, // which map task this is 20 inFile string, 21 nReduce int, // the number of reduce task that will be run ("R" in the paper) 22 mapF func(file string, contents string) []KeyValue, 23 ) { 24 // 25 // You will need to write this function. 26 // 27 // The intermediate output of a map task is stored as multiple 28 // files, one per destination reduce task. The file name includes 29 // both the map task number and the reduce task number. Use the 30 // filename generated by reduceName(jobName, mapTaskNumber, r) as 31 // the intermediate file for reduce task r. Call ihash() (see below) 32 // on each key, mod nReduce, to pick r for a key/value pair. 33 // 34 // mapF() is the map function provided by the application. The first 35 // argument should be the input file name, though the map function 36 // typically ignores it. The second argument should be the entire 37 // input file contents. mapF() returns a slice containing the 38 // key/value pairs for reduce; see common.go for the definition of 39 // KeyValue. 40 // 41 // Look at Go‘s ioutil and os packages for functions to read 42 // and write files. 43 // 44 // Coming up with a scheme for how to format the key/value pairs on 45 // disk can be tricky, especially when taking into account that both 46 // keys and values could contain newlines, quotes, and any other 47 // character you can think of. 48 // 49 // One format often used for serializing data to a byte stream that the 50 // other end can correctly reconstruct is JSON. You are not required to 51 // use JSON, but as the output of the reduce tasks *must* be JSON, 52 // familiarizing yourself with it here may prove useful. You can write 53 // out a data structure as a JSON string to a file using the commented 54 // code below. The corresponding decoding functions can be found in 55 // common_reduce.go. 56 // 57 // enc := json.NewEncoder(file) 58 // for _, kv := ... { 59 // err := enc.Encode(&kv) 60 // 61 // Remember to close the file after you have written all the values! 62 // 63 64 // Read from inFile and save all keys and values in keyvalue 65 var keyvalue []KeyValue 66 fi, err := os.Open(inFile) 67 if err != nil { 68 log.Fatal("doMap Open: ", err) 69 } 70 defer fi.Close() 71 br := bufio.NewReader(fi) 72 for { 73 a, _, c := br.ReadLine() 74 if c == io.EOF { 75 break 76 } 77 kv := mapF(inFile, string(a)) 78 for i := 0; i != len(kv); i++ { 79 keyvalue = append(keyvalue, kv[i]) 80 } 81 } 82 83 // Divide keyvalue into nReduce parts and save them in nReduce files 84 var names []string 85 for r := 0; r != nReduce; r++ { 86 names = append(names, fmt.Sprintf("mrtmp.test-%d-%d", mapTaskNumber, r)) 87 file, err := os.Create(names[r]) 88 if err != nil { 89 log.Fatal("doMap Create: ", err) 90 } 91 start, end := getStartEnd(r, nReduce, len(keyvalue)) 92 enc := json.NewEncoder(file) 93 for _, kv := range keyvalue[start:end] { 94 enc.Encode(kv) 95 } 96 file.Close() 97 } 98 }
對於doReduce函數我們需要讀取nMap個文件,將所有鍵值對解碼並重新編碼寫出到outFile中
1 // doReduce manages one reduce task: it reads the intermediate 2 // key/value pairs (produced by the map phase) for this task, sorts the 3 // intermediate key/value pairs by key, calls the user-defined reduce function 4 // (reduceF) for each key, and writes the output to disk. 5 func doReduce( 6 jobName string, // the name of the whole MapReduce job 7 reduceTaskNumber int, // which reduce task this is 8 outFile string, // write the output here 9 nMap int, // the number of map tasks that were run ("M" in the paper) 10 reduceF func(key string, values []string) string, 11 ) { 12 // 13 // You will need to write this function. 14 // 15 // You‘ll need to read one intermediate file from each map task; 16 // reduceName(jobName, m, reduceTaskNumber) yields the file 17 // name from map task m. 18 // 19 // Your doMap() encoded the key/value pairs in the intermediate 20 // files, so you will need to decode them. If you used JSON, you can 21 // read and decode by creating a decoder and repeatedly calling 22 // .Decode(&kv) on it until it returns an error. 23 // 24 // You may find the first example in the golang sort package 25 // documentation useful. 26 // 27 // reduceF() is the application‘s reduce function. You should 28 // call it once per distinct key, with a slice of all the values 29 // for that key. reduceF() returns the reduced value for that key. 30 // 31 // You should write the reduce output as JSON encoded KeyValue 32 // objects to the file named outFile. We require you to use JSON 33 // because that is what the merger than combines the output 34 // from all the reduce tasks expects. There is nothing special about 35 // JSON -- it is just the marshalling format we chose to use. Your 36 // output code will look something like this: 37 // 38 // enc := json.NewEncoder(file) 39 // for key := ... { 40 // enc.Encode(KeyValue{key, reduceF(...)}) 41 // } 42 // file.Close() 43 // 44 45 // Read all mrtmp.xxx-m-reduceTaskNumber and write to outFile 46 var names []string 47 file, err := os.Create(outFile) 48 if err != nil { 49 log.Fatal("doReduce Create: ", err) 50 } 51 enc := json.NewEncoder(file) 52 defer file.Close() 53 54 // Read all contents from mrtmp.xxx-m-reduceTaskNumber 55 for m := 0; m != nMap; m++ { 56 names = append(names, fmt.Sprintf("mrtmp.test-%d-%d", m, reduceTaskNumber)) 57 fi, err := os.Open(names[m]) 58 if err != nil { 59 log.Fatal("doReduce Open: ", err) 60 } 61 dec := json.NewDecoder(fi) 62 for { 63 var kv KeyValue 64 err = dec.Decode(&kv) 65 if err != nil { 66 break 67 } 68 enc.Encode(kv) 69 } 70 fi.Close() 71 } 72 }
通過測試
MIT-6.824 lab1