以太坊事件機制以及優化
1 以太坊的事件機制
以太坊go-ethereum原始碼中傳送事件除了用常規的通道以外,還用了封裝的Feed結構來執行事件的訂閱和傳送。以太坊中使用了大量的Feed來處理事件。使用Feed訂閱事件的步驟是:
- 定義一個通道ch:ch=make(someType)
- 定義一個Feed物件feed
- Feed訂閱通道ch:feed.Subscribe(ch)
- 使用feed傳送資料給通道:feed.Send(someTypeData)
- ch接收資料:ret<-ch
一個feed可以訂閱多個通道,當使用feed傳送資料後,所有的通道都將接收到資料。下文將解讀Feed的原始碼,在進入Feed原始碼解讀之前我們先介紹一下go中的reflect包中的SelectCase。
2 使用reflect.SelectCase來監聽多個通道
對於多個通道ch1,ch2,ch3,使用傳統的Select方式來監聽:
package main import ( "fmt" "strconv" ) func main() { var chs1 = make(chan int) var chs2 = make(chan float64) var chs3 = make(chan string) var ch4close = make(chan int) defer close(ch4close) go func(c chan int, ch4close chan int) { for i := 0; i < 5; i++ { c <- i } close(c) ch4close <- 1 }(chs1, ch4close) go func(c chan float64, ch4close chan int) { for i := 0; i < 5; i++ { c <- float64(i) + 0.1 } close(c) ch4close <- 1 }(chs2, ch4close) go func(c chan string, ch4close chan int) { for i := 0; i < 5; i++ { c <- "string:" + strconv.Itoa(i) } close(c) ch4close <- 1 }(chs3, ch4close) done := 0 finished := 0 for finished < 3 { select { case v, ok := <-chs1: if ok { done = done + 1 fmt.Println(0, v) } case v, ok := <-chs2: if ok { done = done + 1 fmt.Println(1, v) } case v, ok := <-chs3: if ok { done = done + 1 fmt.Println(2, v) } case _, ok := <- ch4close: if ok { finished = finished+1 } } } fmt.Println("Done", done) }
使用reflect的方式來監聽:
package main import ( "fmt" "reflect" "strconv" ) func main() { var chs1 = make(chan int) var chs2 = make(chan float64) var chs3 = make(chan string) var ch4close = make(chan int) defer close(ch4close) go func(c chan int, ch4close chan int) { for i := 0; i < 5; i++ { c <- i } close(c) ch4close <- 1 }(chs1, ch4close) go func(c chan float64, ch4close chan int) { for i := 0; i < 5; i++ { c <- float64(i) + 0.1 } close(c) ch4close <- 1 }(chs2, ch4close) go func(c chan string, ch4close chan int) { for i := 0; i < 5; i++ { c <- "string:" + strconv.Itoa(i) } close(c) ch4close <- 1 }(chs3, ch4close) var selectCase = make([]reflect.SelectCase, 4) selectCase[0].Dir = reflect.SelectRecv selectCase[0].Chan = reflect.ValueOf(chs1) selectCase[1].Dir = reflect.SelectRecv selectCase[1].Chan = reflect.ValueOf(chs2) selectCase[2].Dir = reflect.SelectRecv selectCase[2].Chan = reflect.ValueOf(chs3) selectCase[3].Dir = reflect.SelectRecv selectCase[3].Chan = reflect.ValueOf(ch4close) done := 0 finished := 0 for finished < len(selectCase)-1 { chosen, recv, recvOk := reflect.Select(selectCase) if recvOk { done = done+1 switch chosen { case 0: fmt.Println(chosen, recv.Int()) case 1: fmt.Println(chosen, recv.Float()) case 2: fmt.Println(chosen, recv.String()) case 3: finished = finished+1 done = done-1 // fmt.Println("finished\t", finished) } } } fmt.Println("Done", done) }
這裡構建了一個reflect.SelectCase陣列selectCase,將要監聽的通道新增到陣列中。監聽時只要使用reflect.Select(selectCase)就可以監聽所有通道的訊息。當通道數多的時候,用SelectCase的方式將會更簡潔優雅。
3 Feed原始碼解讀
Feed結構的原始碼在event/feed.go中。
Feed結構
type Feed struct {
once sync.Once // ensures that init only runs once
sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases.
removeSub chan interface{} // interrupts Send
sendCases caseList // the active set of select cases used by Send
// The inbox holds newly subscribed channels until they are added to sendCases.
mu sync.Mutex
inbox caseList
etype reflect.Type
closed bool
}
type caseList []reflect.SelectCase
Feed結構核心的是inbox成員,它是一個SelectCase的陣列,儲存了該Feed訂閱的所有通道。sendCase是所有活躍的通道陣列。sendLock通道用來作為鎖來保護sendCase。
初始化函式
func (f *Feed) init() {
f.removeSub = make(chan interface{})
f.sendLock = make(chan struct{}, 1)
f.sendLock <- struct{}{}
f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}
這裡sendLock被設定成有容量為1的緩衝通道。並且給sendLock先寫入了一個值。sendCases預先加入了removeSub通道作為第一個通道。
通道訂閱函式
//這個通道需要有足夠的緩衝空間以避免阻塞其它訂閱者。速度慢的訂閱者不會被丟棄
func (f *Feed) Subscribe(channel interface{}) Subscription {
f.once.Do(f.init)
chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
panic(errBadChannel)
}
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
f.mu.Lock()
defer f.mu.Unlock()
if !f.typecheck(chantyp.Elem()) {
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
}
// Add the select case to the inbox.
// The next Send will add it to f.sendCases.
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
f.inbox = append(f.inbox, cas)
return sub
}
這個函式做的事情很簡單,就是根據通道ch構造一個SelectCase物件,然後將其加入到inbox陣列中。這樣就完成了通道的訂閱。
傳送函式
// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)
f.once.Do(f.init)//重新初始化,onece.Do保證只會執行一次
<-f.sendLock //讀sendLock通道,若sendLock為空則會堵塞
// Add new cases from the inbox after taking the send lock.
f.mu.Lock() //訪問公共變數加鎖
f.sendCases = append(f.sendCases, f.inbox...)//將inbox注入到sendCase
f.inbox = nil
if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{} //出錯了,退出前先寫sendLock以免下次send操作堵塞
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
f.mu.Unlock()
// 給所有通道設定要傳送的資料
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}
// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
// of sendCases. When a send succeeds, the corresponding case moves to the end of
// 'cases' and it shrinks by one element.
cases := f.sendCases
for {
// Fast path: try sending without blocking before adding to the select set.
// This should usually succeed if subscribers are fast enough and have free
// buffer space.
for i := firstSubSendCase; i < len(cases); i++ {
//首先使用TrySend進行傳送,這是一種非阻塞操作。當訂閱者足夠快時一般能夠立即成功
if cases[i].Chan.TrySend(rvalue) {
nsent++
cases = cases.deactivate(i)//傳送成功,後移該通道
i--
}
}
if len(cases) == firstSubSendCase {//所有通道傳送完成,退出
break
}
// Select on all the receivers, waiting for them to unblock.
chosen, recv, _ := reflect.Select(cases)//等待通道返回
//<-f.removeSub
if chosen == 0 {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}
// Forget about the sent value and hand off the send lock.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
f.sendLock <- struct{}{}//返回時寫入sendLock,為下次傳送做準備
return nsent
}
send函式使用通道的trySend方法來發送,在正常情況下能夠立即傳送成功,但是當接收通道堵塞的時候,則需要用Select方法這種堵塞的方式等待通道傳送成功。在最後返回時,寫入sendLock,為下次傳送做準備。
4 send函式存在的問題及優化
我們看到send函式使用了sendLock通道,它是一個容量為1的通道。在send函式最開始,讀出sendLock通道,如果這個時候sendLock為空,則send函式就會堵塞。所以在send函式最後,寫入了sendLock通道,這樣下次傳送去讀sendLock時就不會堵塞。看起來好像沒有問題,但是理想很豐滿,顯示有時候會骨感。這裡存在的問題就是chosen, recv, _ := reflect.Select(cases)這行程式碼可能會堵塞,導致for迴圈一值退不出,send函式發生堵塞,導致sendLock不會被寫入。從而導致了死鎖。下次send傳送就會被堵塞。
這裡使用sendLock是為了保護公共的sendCase資料,解決思路是去掉sendCase,不適用全域性的sendCase,而使用區域性變數。這樣就不用考慮同步的問題了。改造後的send函式:
func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)
f.once.Do(f.init)
//<-f.sendLock
sendCases := caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
sendCases = append(sendCases, f.inbox...)
// Set the sent value on all channels.
for i := firstSubSendCase; i < len(sendCases); i++ {
sendCases[i].Send = rvalue
}
// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
// of sendCases. When a send succeeds, the corresponding case moves to the end of
// 'cases' and it shrinks by one element.
cases := sendCases
//LOOP:
for {
// Fast path: try sending without blocking before adding to the select set.
// This should usually succeed if subscribers are fast enough and have free
// buffer space.
for i := firstSubSendCase; i < len(cases); i++ {
if cases[i].Chan.TrySend(rvalue) {
nsent++
cases = cases.deactivate(i)
i--
}
}
if len(cases) == firstSubSendCase {
break
}
// Select on all the receivers, waiting for them to unblock.
chosen, recv, _ := reflect.Select(cases)
//<-f.removeSub
if chosen == 0 {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}
// Forget about the sent value and hand off the send lock.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
//f.sendLock <- struct{}{}
return nsent
}
某次send可能會堵塞,但是不會影響下次send傳送。
5 go-ethereum原始碼中使用send的坑
我們看core/blockchain.go中的傳送函式PostChainEvents():
// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
log.Info("lzj-log PostChainEvents", "events len",len(events))
// post event logs for further processing
if logs != nil {
bc.logsFeed.Send(logs)
}
for _, event := range events {
switch ev := event.(type) {
case ChainEvent:
log.Info("lzj-log send ChainEvent")
bc.chainFeed.Send(ev)
case ChainHeadEvent:
log.Info("lzj-log send ChainHeadEvent")
bc.chainHeadFeed.Send(ev)
case ChainSideEvent:
log.Info("lzj-log send ChainSideEvent")
bc.chainSideFeed.Send(ev)
}
}
}
這個函式是在for迴圈中先後傳送了ChainEvent、ChainHeadEvent和ChainSideEvent事件。在insert函式中呼叫了這個 函式。但是這裡有個問題,如果前一個事件傳送堵塞了,後面的事件傳送就不會執行。需要把Send函式放到單獨的協程中去。改成這樣可以防止堵塞的問題:
// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
log.Info("lzj-log PostChainEvents", "events len",len(events))
// post event logs for further processing
if logs != nil {
bc.logsFeed.Send(logs)
}
for _, event := range events {
switch ev := event.(type) {
case ChainEvent:
log.Info("lzj-log send ChainEvent")
go bc.chainFeed.Send(ev)
case ChainHeadEvent:
log.Info("lzj-log send ChainHeadEvent")
go bc.chainHeadFeed.Send(ev)
case ChainSideEvent:
log.Info("lzj-log send ChainSideEvent")
go bc.chainSideFeed.Send(ev)
}
}
}
在go裡面使用通道要發非常小心,因為很容易引起堵塞從而達不到自己期望的結果。