1. 程式人生 > >[分散式系統學習] 6.824 LEC2 RPC和執行緒 筆記

[分散式系統學習] 6.824 LEC2 RPC和執行緒 筆記

6.824的課程通常是在課前讓你做一些準備。一般來說是先讀一篇論文,然後請你提一個問題,再請你回答一個問題。然後上課,然後佈置Lab。

第二課的準備-Crawler

第二課的準備不是論文,是讓你實現Go Tour裡面的crawler。Go Tour裡面原有的實現是序列的,並且可能爬到相同的url。要求讓你並行並去重。

簡單想法就是,為了實現並行,爬每個url都是用goroutine;為了實現去重,每次開爬就把url放到map中。

不過這裡有個知識點,Crawler函式最後返回成功,所有url都要爬完,所以需要一個機制去等待所有goroutine完成。查了一下可以用sync.WaitGroup。那一個直觀的實現:

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
var collector Collector;
collector.fetchedUrl = make(map[string]bool)
CrawlInt(url, depth, fetcher, &collector)
collector.Wait()
}

type Collector struct {
sync.Mutex
sync.WaitGroup
fetchedUrl map[string]bool
}

func CrawlInt(url string, depth int, fetcher Fetcher, collector *Collector) {
if depth <= 0 {
return
}
collector.Lock()
if _, ok := collector.fetchedUrl[url]; ok {
//visited,
collector.Unlock()
return
}
collector.fetchedUrl[url] = true
collector.Unlock()
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
collector.Add(len(urls))
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
go func (u string) {
CrawlInt(u, depth-1, fetcher, collector)
collector.Done()
}(u)
}
return
}

不過看到了答案,覺得答案很簡潔,不僅沒有用到WaitGroup,甚至連一個Lock都沒有使用。

//
// Concurrent crawler with channels
//

func dofetch(url1 string, ch chan []string, fetcher Fetcher) {
	body, urls, err := fetcher.Fetch(url1)
	if err != nil {
		fmt.Println(err)
		ch <- []string{}
	} else {
		fmt.Printf("found: %s %q\n", url1, body)
		ch <- urls
	}
}

func master(ch chan []string, fetcher Fetcher) {
	n := 1
	fetched := make(map[string]bool)
	for urls := range ch {
		for _, u := range urls {
			if _, ok := fetched[u]; ok == false {
				fetched[u] = true
				n += 1
				go dofetch(u, ch, fetcher)
			}
		}
		n -= 1
		if n == 0 {
			break
		}
	}
}

func CrawlConcurrentChannel(url string, fetcher Fetcher) {
	ch := make(chan []string)
	go func() {
		ch <- []string{url}
	}()
	master(ch, fetcher)
}

Crawler函式是那個CrawlConcurrentChannel。ch裡面放的是每次fetch返回的頁面陣列。為什麼不用到Lock呢?因為fetched map的判斷和加入都在主執行緒中。

ch裡面的urls當然可能重複,但是在主執行緒中已經判斷過了不會重複fetch。

而通過n來判斷是否所有頁面都被爬取了。所以有n==sizeof (ch) == sizeof (fetched)。這裡的sizeof指的是所有放入的,不是某一時刻的。

Go 的RPC

我們在前面一個Lab裡面已經遇到過了。覺得有點像Soap的方式,不過完全沒有Soap那麼複雜,需要定義wsdl。

至少傳送一次 vs 至多傳送一次

至少傳送一次:RPC lib 等待返回,如果超時,再發。這樣多嘗試幾次,始終沒有返回,就報錯。

這樣能解決問題麼?如果是傳送的剋扣餘額會出現什麼問題?

所以“至少傳送一次”對於只讀操作,和可重入操作是有效的。比如我們上一個Lab中的Map和Reduce,都是可重入的。

至多傳送一次:問題在於如何檢測重複請求。

client可以傳送一個唯一的ID(XID)用於驗證重複。伺服器做如下處理。

server:
    if seen[xid]:
      r = old[xid]
    else
      r = handler()
      old[xid] = r
      seen[xid] = true

這裡要處理的問題是:

1. client怎麼保證XID唯一?現在UUID可以做到,另外也可以通過ip地址加上序列號來做hash值。

2. 伺服器要在某時刻清理調之前的請求,否則每個請求都放到seen map裡面,那要爆掉了。client可以在每條RPC中都包含一個”已經收到#<X的回覆“的資訊,這樣,伺服器就可以拋棄它們。

3. 伺服器正在處理某個request,但是新的request已經進來了,伺服器不想做第二次,那麼他可以設定一個”pending“flag,讓新的request等待或者忽略。

Go語言的RPC策略是”至多傳送一次“。