golang的chan有趣用法
寫這個部落格的背景是我面試一家公司,這家公司的CTO給我出了一道我認為挺有意思的題,題的大概是這樣的:
// 抽象一個柵欄 type Barrier interface { Wait () } // 建立柵欄物件 func NewBarrier (n int) Barrier { } // 柵欄的實現類 type barrier struct { } // 測試程式碼 func main () { // 建立柵欄物件 b := NewBarrier(10) // 達到的效果:前9個協程呼叫Wait()阻塞,第10個呼叫後10個協程全部喚醒 for i:=0; i<10; i++ { go b.Wait() } }
需要對上面的NewBarrier()函式和barrier這個類進行修改,達到預期的效果。而且還要有條件約束,就是不能用任何同步相關的操作,但可以用chan,前提是無緩衝模式的。
在做這個問題的時候我一直在想C裡面有人實現無鎖佇列,利用的CPU的CAS指令可以不用加鎖就可以對變數實現原子操作。先不說golang中是不是支援此類的語言,其實原子操作也會是同步操作的範疇,本身就已經犯規了,所以還是老老實實的用chan是實現吧。對chan有了解的都知道,沒有緩衝的生產和消費必須同時呼叫二者都會被喚醒,否則任意一方都會被阻塞。至少呼叫Wait()阻塞的方法有一個選擇了,那我們的先可以這麼實現:
type barrier struct { chCount chan struct{} // 所有呼叫Wait()函式的先通過這個chan阻塞 count int // 記住數量,阻塞超過這個量就可以啟用所有協程了 } // 按照上面的定義,建構函式就可以這麼寫了 func NewBarrier(n int) Barrier { b := &barrier{count: n, chCount: make(chan struct{}))} } // Wait()可以先寫成這樣 func (b *barrier) Wait() { b.chCount <- struct{}{} }
上面的程式碼至少實現了所有的協程都能阻塞了,那麼問題來了,我通過什麼方式計數呢?因為chan沒有緩衝,沒法用cap()函式獲取數量。如果定義全域性變數的方式計數,沒有鎖或者原子操作是沒法正確統計計數的,唯一的方式就是從chan中一個一個的讀出來計數。那麼就會增加如下程式碼:
func NewBarrier(n int) Barrier {
b := &barrier{count: n, chCount: make(chan struct{}), chSync: make(chan struct{})}
go b.Sync() // 增加一個協程用來後臺計數
return b
}
// 後臺計數的協程
func (b *barrier) Sync() {
count := 0
// chan也可以通過range 操作的,感興趣的同學可以看我的《深入淺出golang之chan》
for range b.chCount {
// 累計計數
count++
if count >= b.count {
// 這裡就是統計滿足條件的地方了
}
}
}
現在我們面臨了新的問題,當後臺這個協程每次從chan讀取一個元素的時候,那個傳送該資料阻塞的協程就會被喚醒,這個就沒法滿足我們的要求了。有什麼方法能讓傳送協程不被啟用麼,如果是當前狀態的chan是無解的。那隻能再想一個方法就是讓那個寫入協程再次阻塞,起初我的想法可以用Sleep()函式,後臺協程累計計數達到條件後設置一個標記,寫入協程通過迴圈Sleep()一段時間判斷這個標記就可以搞定了。但是作為老司機的我根本沒法接受這麼醜陋的程式碼,我再用一個chan阻塞不就可以了麼?當統計計數滿足條件,直接close掉這個chan,所有的協程就自動激活了,所以程式碼就變成了這樣:
type barrier struct {
chCount chan struct{}
chSync chan struct{} // 增加一個chan
count int
}
func NewBarrier(n int) Barrier {
b := &barrier{count: n, chCount: make(chan struct{}), chSync: make(chan struct{})}
go b.Sync()
return b
}
func (b *barrier) Wait() {
b.chCount <- struct{}{}
<-b.chSync // 再次阻塞
}
func (b *barrier) Sync() {
count := 0
for range b.chCount {
count++
if count >= b.count {
close(b.chSync) // close這個chan所有阻塞協程都會被啟用
break
}
}
}
完美解決問題,如果僅僅是解決這個問題我就不會寫這個文章了。在這個事情上我想到了兩個chan可以實現很多很有意思的功能,上面提到的問題就是其中之一。我再說一個例子:設計一個分發器,當有資料進入分發器後,需要將資料分發到多個處理器處理,每個處理器可以想象為一個協程,處理器在沒有資料的時候要阻塞。我相信很多人肯定會設計成這樣:
這種設計的最大的問題在於多個Processor處理時長不同會造成木桶效應,多個Processor會被一個Processor拖累。那有人肯定會說可以給chan加緩衝啊,試問緩衝設計多大合適呢?如果真存在一個處理非常慢的Processor多大的緩衝都無濟於事,所以應該設計成這樣:
有人肯定會說這個chan帶buffer沒啥區別啊,我告訴你這區別大了!用一個專用的協程實現從chan2讀取資料放到緩衝中,然後再衝緩衝中讀取資料放到chan3中,全程chan2和chan3都無需緩衝。這樣分發器不會由於任何一個Processor慢被拖累,同時緩衝Buffer可以設計成彈性的Buffer,不會被設定成一個固定的值。還有誰有非常有趣的關於chan使用,都可以提出來大家一起分享!