[分散式系統學習] 6.824 LEC2 RPC和執行緒 筆記
6.824的課程通常是在課前讓你做一些準備。一般來說是先讀一篇論文,然後請你提一個問題,再請你回答一個問題。然後上課,然後佈置Lab。
第二課的準備-Crawler
第二課的準備不是論文,是讓你實現Go Tour裡面的crawler。Go Tour裡面原有的實現是序列的,並且可能爬到相同的url。要求讓你並行並去重。
簡單想法就是,為了實現並行,爬每個url都是用goroutine;為了實現去重,每次開爬就把url放到map中。
不過這裡有個知識點,Crawler函式最後返回成功,所有url都要爬完,所以需要一個機制去等待所有goroutine完成。查了一下可以用sync.WaitGroup。那一個直觀的實現:
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
var collector Collector;
collector.fetchedUrl = make(map[string]bool)
CrawlInt(url, depth, fetcher, &collector)
collector.Wait()
}
type Collector struct {
sync.Mutex
sync.WaitGroup
fetchedUrl map[string]bool
}
func CrawlInt(url string, depth int, fetcher Fetcher, collector *Collector) {
if depth <= 0 {
return
}
collector.Lock()
if _, ok := collector.fetchedUrl[url]; ok {
//visited,
collector.Unlock()
return
}
collector.fetchedUrl[url] = true
collector.Unlock()
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
collector.Add(len(urls))
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
go func (u string) {
CrawlInt(u, depth-1, fetcher, collector)
collector.Done()
}(u)
}
return
}
不過看到了答案,覺得答案很簡潔,不僅沒有用到WaitGroup,甚至連一個Lock都沒有使用。
// // Concurrent crawler with channels // func dofetch(url1 string, ch chan []string, fetcher Fetcher) { body, urls, err := fetcher.Fetch(url1) if err != nil { fmt.Println(err) ch <- []string{} } else { fmt.Printf("found: %s %q\n", url1, body) ch <- urls } } func master(ch chan []string, fetcher Fetcher) { n := 1 fetched := make(map[string]bool) for urls := range ch { for _, u := range urls { if _, ok := fetched[u]; ok == false { fetched[u] = true n += 1 go dofetch(u, ch, fetcher) } } n -= 1 if n == 0 { break } } } func CrawlConcurrentChannel(url string, fetcher Fetcher) { ch := make(chan []string) go func() { ch <- []string{url} }() master(ch, fetcher) }
Crawler函式是那個CrawlConcurrentChannel。ch裡面放的是每次fetch返回的頁面陣列。為什麼不用到Lock呢?因為fetched map的判斷和加入都在主執行緒中。
ch裡面的urls當然可能重複,但是在主執行緒中已經判斷過了不會重複fetch。
而通過n來判斷是否所有頁面都被爬取了。所以有n==sizeof (ch) == sizeof (fetched)。這裡的sizeof指的是所有放入的,不是某一時刻的。
Go 的RPC
我們在前面一個Lab裡面已經遇到過了。覺得有點像Soap的方式,不過完全沒有Soap那麼複雜,需要定義wsdl。
至少傳送一次 vs 至多傳送一次
至少傳送一次:RPC lib 等待返回,如果超時,再發。這樣多嘗試幾次,始終沒有返回,就報錯。
這樣能解決問題麼?如果是傳送的剋扣餘額會出現什麼問題?
所以“至少傳送一次”對於只讀操作,和可重入操作是有效的。比如我們上一個Lab中的Map和Reduce,都是可重入的。
至多傳送一次:問題在於如何檢測重複請求。
client可以傳送一個唯一的ID(XID)用於驗證重複。伺服器做如下處理。
server: if seen[xid]: r = old[xid] else r = handler() old[xid] = r seen[xid] = true
這裡要處理的問題是:
1. client怎麼保證XID唯一?現在UUID可以做到,另外也可以通過ip地址加上序列號來做hash值。
2. 伺服器要在某時刻清理調之前的請求,否則每個請求都放到seen map裡面,那要爆掉了。client可以在每條RPC中都包含一個”已經收到#<X的回覆“的資訊,這樣,伺服器就可以拋棄它們。
3. 伺服器正在處理某個request,但是新的request已經進來了,伺服器不想做第二次,那麼他可以設定一個”pending“flag,讓新的request等待或者忽略。
Go語言的RPC策略是”至多傳送一次“。