1. 程式人生 > >golang的chan有趣用法

golang的chan有趣用法

寫這個部落格的背景是我面試一家公司,這家公司的CTO給我出了一道我認為挺有意思的題,題的大概是這樣的:

// 抽象一個柵欄
type Barrier interface {
    Wait ()
}
// 建立柵欄物件
func NewBarrier (n int) Barrier {
   
}
// 柵欄的實現類
type barrier struct {

}
// 測試程式碼
func main () {
    // 建立柵欄物件
    b := NewBarrier(10)
    // 達到的效果:前9個協程呼叫Wait()阻塞,第10個呼叫後10個協程全部喚醒
    for i:=0; i<10; i++ {
        go b.Wait()
    }
}

需要對上面的NewBarrier()函式和barrier這個類進行修改,達到預期的效果。而且還要有條件約束,就是不能用任何同步相關的操作,但可以用chan,前提是無緩衝模式的。

在做這個問題的時候我一直在想C裡面有人實現無鎖佇列,利用的CPU的CAS指令可以不用加鎖就可以對變數實現原子操作。先不說golang中是不是支援此類的語言,其實原子操作也會是同步操作的範疇,本身就已經犯規了,所以還是老老實實的用chan是實現吧。對chan有了解的都知道,沒有緩衝的生產和消費必須同時呼叫二者都會被喚醒,否則任意一方都會被阻塞。至少呼叫Wait()阻塞的方法有一個選擇了,那我們的先可以這麼實現:

type barrier struct {
    chCount chan struct{} // 所有呼叫Wait()函式的先通過這個chan阻塞
    count   int           // 記住數量,阻塞超過這個量就可以啟用所有協程了
}
// 按照上面的定義,建構函式就可以這麼寫了
func NewBarrier(n int) Barrier {
    b := &barrier{count: n, chCount: make(chan struct{}))}
}
// Wait()可以先寫成這樣
func (b *barrier) Wait() {
    b.chCount <- struct{}{}
}

上面的程式碼至少實現了所有的協程都能阻塞了,那麼問題來了,我通過什麼方式計數呢?因為chan沒有緩衝,沒法用cap()函式獲取數量。如果定義全域性變數的方式計數,沒有鎖或者原子操作是沒法正確統計計數的,唯一的方式就是從chan中一個一個的讀出來計數。那麼就會增加如下程式碼:

func NewBarrier(n int) Barrier {
    b := &barrier{count: n, chCount: make(chan struct{}), chSync: make(chan struct{})}
    go b.Sync() // 增加一個協程用來後臺計數
    return b
}
// 後臺計數的協程
func (b *barrier) Sync() {
    count := 0
    // chan也可以通過range 操作的,感興趣的同學可以看我的《深入淺出golang之chan》
    for range b.chCount {
        // 累計計數
        count++
        if count >= b.count {
            // 這裡就是統計滿足條件的地方了
        }
    }
}

現在我們面臨了新的問題,當後臺這個協程每次從chan讀取一個元素的時候,那個傳送該資料阻塞的協程就會被喚醒,這個就沒法滿足我們的要求了。有什麼方法能讓傳送協程不被啟用麼,如果是當前狀態的chan是無解的。那隻能再想一個方法就是讓那個寫入協程再次阻塞,起初我的想法可以用Sleep()函式,後臺協程累計計數達到條件後設置一個標記,寫入協程通過迴圈Sleep()一段時間判斷這個標記就可以搞定了。但是作為老司機的我根本沒法接受這麼醜陋的程式碼,我再用一個chan阻塞不就可以了麼?當統計計數滿足條件,直接close掉這個chan,所有的協程就自動激活了,所以程式碼就變成了這樣:


type barrier struct {
    chCount chan struct{}
    chSync  chan struct{}     // 增加一個chan
    count   int
}
func NewBarrier(n int) Barrier {
    b := &barrier{count: n, chCount: make(chan struct{}), chSync: make(chan struct{})}
    go b.Sync()
    return b
}
func (b *barrier) Wait() {
    b.chCount <- struct{}{}
    <-b.chSync                // 再次阻塞
}
func (b *barrier) Sync() {
    count := 0
    for range b.chCount {
        count++
        if count >= b.count {
            close(b.chSync)   // close這個chan所有阻塞協程都會被啟用
            break
        }
    }
}

完美解決問題,如果僅僅是解決這個問題我就不會寫這個文章了。在這個事情上我想到了兩個chan可以實現很多很有意思的功能,上面提到的問題就是其中之一。我再說一個例子:設計一個分發器,當有資料進入分發器後,需要將資料分發到多個處理器處理,每個處理器可以想象為一個協程,處理器在沒有資料的時候要阻塞。我相信很多人肯定會設計成這樣:

                                     

這種設計的最大的問題在於多個Processor處理時長不同會造成木桶效應,多個Processor會被一個Processor拖累。那有人肯定會說可以給chan加緩衝啊,試問緩衝設計多大合適呢?如果真存在一個處理非常慢的Processor多大的緩衝都無濟於事,所以應該設計成這樣:

                     

有人肯定會說這個chan帶buffer沒啥區別啊,我告訴你這區別大了!用一個專用的協程實現從chan2讀取資料放到緩衝中,然後再衝緩衝中讀取資料放到chan3中,全程chan2和chan3都無需緩衝。這樣分發器不會由於任何一個Processor慢被拖累,同時緩衝Buffer可以設計成彈性的Buffer,不會被設定成一個固定的值。還有誰有非常有趣的關於chan使用,都可以提出來大家一起分享!