1. 程式人生 > 程式設計 >MIT6.824 Lab1 預熱

MIT6.824 Lab1 預熱

假設有個字串:

var str = "The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines."
複製程式碼

需要統計裡面每個字母出現的次數。最直觀簡單的做法就是利用一個 map,從開始到末尾讀這個字串,並把字母作為 key,出現的次數作為 value。Map 中包含 key 的時候,value + 1,Map 中沒有 key 的時候預設 1。最後讀完這個字串就 OK。

var m = make(map[string]int)
temp := strings.Split(str,"")

for _,c := range temp {
    if !unicode.IsLetter([]rune(c)[0]) {
        continue
    }
    if count,ok := m[c]; ok {
        m[c] = count + 1
    } else {
        m[c] = 1
    }
}
複製程式碼
[M:3 R:1 y:7 o:13 v:1 e:26 h:7 l:10 i:14 r:15 T:1 p:13 d:1 u:6 c:8 b:5 s:14 g:4 a:17 f:5 m:7 t:20 B:1 I:1 n:10]
複製程式碼

在現實世界中,這個 str 可能非常巨大,所以有時候我們需要將源文字拆分成多個小的字串,然後多個執行緒同時處理,每個執行緒計算得到當前的中間結果,最後合併到一起。

上述的過程在函式語言程式設計中可以被抽象為 Map 和 Reduce 兩個函式。其中 Map 函式是把一個陣列的每個元素按照相同的邏輯處理之後返回的結果,Reduce 函式是把所有元素整合起來得到結果。通常這個兩個函式的引數都是函式,Map 的返回值一般也是陣列,Reduce 的返回值可能是各種型別。

為了在單機上實現出併發處理的效果,可以用 Go 自帶的 goroutine 來實現。下面把拆分的工作省略,直接進入主題

接下來用 4 個 goroutine 同時處理這些 string,每個做 goroutine 利用 單機序列版

的邏輯,生產出一個小規模的中間內容。隨後把每個中間內容都整合起來得到最終值。接下來需要考慮

  • Go 天生支援 CSP 程式設計模型,所以利用 channel 做通訊沒有問題
  • 是否有 data race
package main

import (
	"strings"
	"sync"
	"unicode"
)

type ResultMap struct {
	sync.Mutex
	result map[string]int
}

func main()  {
	str1 := "The MapReduce library in the user program first"
	str2 := "splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB)"
	str3 := "per piece (controllable by the user via an optional parameter)."
	str4 := "It then starts up many copies of the program on a cluster of machines."

	strs := []string {str1,str2,str3,str4}

	// 主執行緒需要阻塞直到所有的 reduce 都結束
	var waitGroup sync.WaitGroup
	waitGroup.Add(len(strs))

	c := make(chan map[string]int)

	res := new(ResultMap)
	res.result = make(map[string]int)

	for _,str := range strs {
		go doMap(str,c)
		go doReduce(c,res,&waitGroup)
	}

	waitGroup.Wait()

	sortPrintMap(res.result)

}

// 生產出對應的 kv 傳遞給 channel
func doMap(str string,c chan map[string]int) {
	temp := strings.Split(str,"")
	m := make(map[string]int)

	for _,c := range temp {
		if !unicode.IsLetter([]rune(c)[0]) {
			continue
		}
		if count,ok := m[c]; ok {
			m[c] = count + 1
		} else {
			m[c] = 1
		}
	}
	c <- m
}

// 合併
func doReduce(c chan map[string]int,res *ResultMap,group *sync.WaitGroup) {
	res.Lock()
	defer res.Unlock()
	for k,v := range <- c {
		if count,ok := res.result[k]; ok {
			res.result[k] = count + v
		} else {
			res.result[k] = v
		}
	}
	group.Done()
}
複製程式碼

檢查一下結果 (Map 的 key 本身是無序的,這裡是排好序之後的)

[M:3 R:1 y:7 o:13 v:1 e:26 h:7 l:10 i:14 r:15 T:1 p:13 d:1 u:6 c:8 b:5 s:14 g:4 a:17 f:5 m:7 t:20 B:1 I:1 n:10]
複製程式碼

結果無誤之後,這個問題可以再深入

  • 上述的 reduce 和 map 是單機上的,之間的資料共享用了 channel,如果是物理隔離的場景下,如何用別的東西做資料共享?
  • 任何一個子任務都有可能因為各種原因掛掉,如何在某個子任務掛掉的情況下,系統的準確性不受影響,甚至能自愈?
  • 上述的 goroutine 在執行結束之後就會被排程器回收,但實際上因為 map 總是會比 reduce 先結束,那麼後期的過程實際上可以有更多的 goroutine 可以參與到 reduce 任務中 r 如何實現這種排程讓資源可以被更加充分的利用?