以太坊 p2p Server 原理及實現
以太坊p2p原理與實現
區塊鏈技術的去中心依賴於底層組網技術,以太坊的底層實現了p2pServer,大約可以分為這樣三層。
- 底層路由表。封裝了kad路由,節點的資料結構以及計算記錄,節點搜尋,驗證等功能。
- 中層peer抽象,message開放傳送介面,server對外提供peer檢測,初始化,事件訂閱,peer狀態查詢,啟動,停止等功能
- 以太坊最上層peer,peerset再封裝,通過協議的Run函式,在中層啟動peer時,獲取peer,最終通過一個迴圈擷取穩定peer,包裝在peerset中使用。
底層路由表
這裡簡化問題僅討論Node Discovery Protocol。 這一層維護了一個buckets桶,總共有17個桶,每個桶有16個節點和10個替換節點。 Node放入時先要計算hash和localNode的距離。再按距離選擇一個桶放進去,取的時候逐個計算target和每個桶中物件的舉例,詳細參考closest函式,後面會貼出來。
距離公式滿足:f(x,y)=256-8*n-map(x[n+1]^y[n+1]) 注:n為相同節點數量 map為一個負相關的對映關係。
簡單來說就是相似越多,值越小。細節參考Node.go的logdist函式。 這裡需要了解演算法Kademlia,
.
├── database.go //封裝node資料庫相關操作
├── node.go //節點資料結構
├── ntp.go //同步時間
├── table.go //路由表
├── udp.go //網路相關操作
其中最重要的就是table物件,table公共方法有:
- newTable 例項建立
- Self local節點獲取
- ReadRandomNodes 隨機讀取幾個節點
- Close 關閉
- Resolve 在周邊查詢某個節點
- Lookup 查詢某個節點的鄰近節點
逐個來分析這些方法:
newTable
- 1:生成物件例項(獲取資料庫客戶端,LocalNode etc)
// If no node database was given, use an in-memory one
db, err := newNodeDB(nodeDBPath, Version, ourID)
if err != nil {
return nil, err
}
tab := &Table{
net: t,
db: db,
self: NewNode(ourID, ourAddr.IP, uint16 (ourAddr.Port), uint16(ourAddr.Port)),
bonding: make(map[NodeID]*bondproc),
bondslots: make(chan struct{}, maxBondingPingPongs),
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
}
- 2:載入引導節點,初始化k桶。
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
for i := 0; i < cap(tab.bondslots); i++ {
tab.bondslots <- struct{}{}
}
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
- 3:將節點放入到桶裡,生成一條協程用於重新整理,驗證節點。
tab.seedRand()
tab.loadSeedNodes(false) //載入種子節點
// Start the background expiration goroutine after loading seeds so that the search for
// seed nodes also considers older nodes that would otherwise be removed by the
// expiration.
tab.db.ensureExpirer()
go tab.loop()
載入種子節點
func(tab *Table)loadSeedNodes(bond bool) {
seeds := tab.db.querySeeds(seedCount, seedMaxAge)
//資料庫中的種子節點和引導節點合併
seeds = append(seeds, tab.nursery...)
if bond {
seeds = tab.bondall(seeds) //節點驗證
}
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func()interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
tab.add(seed) //節點入桶
}
}
節點入桶,同時也要檢查ip等限制。
func(tab *Table)add(new *Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(new.sha) //獲取當前節點對應的桶
if !tab.bumpOrAdd(b, new) {
// Node is not in table. Add it to the replacement list.
tab.addReplacement(b, new)
}
}
桶的選擇
func (tab *Table) bucket(sha common.Hash) *bucket {
d := logdist(tab.self.sha, sha) //計算hash舉例
if d <= bucketMinDistance {
//這裡按演算法來看,只要hash前三位相等就會到第一個buckets
return tab.buckets[0]
}
return tab.buckets[d-bucketMinDistance-1]
}
Resolve
根據Node的Id查詢Node,先在當前的桶裡面查詢,查詢一遍之後沒找到就在周邊的節點裡面搜尋一遍再找。
// Resolve searches for a specific node with the given ID.
// It returns nil if the node could not be found.
func(tab *Table)Resolve(targetID NodeID) *Node {
// If the node is present in the local table, no
// network interaction is required.
hash := crypto.Keccak256Hash(targetID[:])
tab.mutex.Lock()
//查詢最近節點
cl := tab.closest(hash, 1)
tab.mutex.Unlock()
if len(cl.entries) > 0 && cl.entries[0].ID == targetID {
return cl.entries[0]
}
// Otherwise, do a network lookup.
//不存在 搜尋鄰居節點
result := tab.Lookup(targetID)
for _, n := range result {
if n.ID == targetID {
return n
}
}
return nil
}
這裡需要理解的函式是 closest,遍歷所有桶的所有節點,查詢最近的一個
// closest returns the n nodes in the table that are closest to the
// given id. The caller must hold tab.mutex.
func(tab *Table)closest(target common.Hash, nresults int) *nodesByDistance {
// This is a very wasteful way to find the closest nodes but
// obviously correct. I believe that tree-based buckets would make
// this easier to implement efficiently.
close := &nodesByDistance{target: target}
for _, b := range tab.buckets {
for _, n := range b.entries {
close.push(n, nresults)
}
}
return close
}
func(h *nodesByDistance)push(n *Node, maxElems int) {
ix := sort.Search(len(h.entries), func(i int)bool {
return distcmp(h.target, h.entries[i].sha, n.sha) > 0
})
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
}
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
// slide existing entries down to make room
// this will overwrite the entry we just appended.
//近的靠前邊
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n
}
}
ReadRandomNodes
整體思路是先拷貝出來,再逐個桶的抽最上面的一個,剩下空桶移除,剩下的桶合併後,下一輪再抽桶的第一個節點,直到填滿給定資料或者桶全部空掉。最後返回填到數組裡面的數量。
// ReadRandomNodes fills the given slice with random nodes from the
// table. It will not write the same node more than once. The nodes in
// the slice are copies and can be modified by the caller.
func(tab *Table)ReadRandomNodes(buf []*Node)(n int) {
if !tab.isInitDone() {
return 0
}
tab.mutex.Lock()
defer tab.mutex.Unlock()
// Find all non-empty buckets and get a fresh slice of their entries.
var buckets [][]*Node
//拷貝節點
for _, b := range tab.buckets {
if len(b.entries) > 0 {
buckets = append(buckets, b.entries[:])
}
}
if len(buckets) == 0 {
return 0
}
// Shuffle the buckets.
for i := len(buckets) - 1; i > 0; i-- {
j := tab.rand.Intn(len(buckets))
buckets[i], buckets[j] = buckets[j], buckets[i]
}
// Move head of each bucket into buf, removing buckets that become empty.
var i, j int
for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {
b := buckets[j]
buf[i] = &(*b[0]) //取第一個節點
buckets[j] = b[1:] //移除第一個
if len(b) == 1 {
//空桶移除
buckets = append(buckets[:j], buckets[j+1:]...)
}
if len(buckets) == 0 {
break
}
}
return i + 1
}
Lookup
lookup會要求已知節點查詢鄰居節點,查詢的鄰居節點又遞迴的找它周邊的節點
for {
// ask the alpha closest nodes that we haven't asked yet
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] {
asked[n.ID] = true
pendingQueries++
go func() {
// Find potential neighbors to bond with
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil {
// Bump the failure counter to detect and evacuate non-bonded entries
fails := tab.db.findFails(n.ID) + 1
tab.db.updateFindFails(n.ID, fails)
log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
}
reply <- tab.bondall(r)
}()
}
}
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
// wait for the next reply
for _, n := range <-reply { //此處會阻塞請求
if n != nil && !seen[n.ID] {
seen[n.ID] = true
result.push(n, bucketSize)
}
}
pendingQueries--
}
桶的維護
桶初始化完成後會進入一個迴圈邏輯,其中通過三個timer控制調整週期。
- 驗證timer 間隔 10s左右
- 重新整理timer 間隔 30 min
- 持久化timer 間隔 30s
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
重新整理邏輯:重新載入種子節點,查詢周邊節點,隨機三個節點,並查詢這三個節點的周圍節點。
func(tab *Table)doRefresh(done chanstruct{}) {
defer close(done)
tab.loadSeedNodes(true)
tab.lookup(tab.self.ID, false)
for i := 0; i < 3; i++ {
var target NodeID
crand.Read(target[:])
tab.lookup(target, false)
}
}
驗證邏輯:驗證每個桶的最末尾節點,如果該節點通過驗證則放到隊首(驗證過程是本地節點向它傳送ping請求,如果迴應pong則通過)
last, bi := tab.nodeToRevalidate() //取最後一個節點
if last == nil {
// No non-empty bucket found.
return
}
// Ping the selected node and wait for a pong.
err := tab.ping(last.ID, last.addr()) //通訊驗證
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.buckets[bi]
if err == nil {
// The node responded, move it to the front.
log.Debug("Revalidated node", "b", bi, "id", last.ID)
b.bump(last) //提到隊首
return
}
Peer/Server
相關檔案
.
├── dial.go //封裝一個任務生成處理結構以及三種任務結構中(此處命名不太精確)
├── message.go //定義一些資料的讀寫介面,以及對外的Send/SendItem函式
├── peer.go //封裝了Peer 包括訊息讀取
├── rlpx.go //內部的握手協議
├── server.go //初始化,維護Peer網路,還有一些對外的介面
這一層會不斷的從路由中提取節點,提取出來的節點要經過身份驗證,協議檢查之後加入到peer裡面,緊接著如果沒有人使用這個peer,這個peer就會被刪除,再重新選擇一些節點出來繼續這個流程,peer再其中是隨生隨銷,這樣做是為了平均的使用所有的節點,而不是僅僅依賴於特定的幾個節點。因而這裡從Server開始入手分析整個流程
Peers() //peer物件
PeerCount() //peer數量
AddPeer(node *discover.Node) //新增節點
RemovePeer(node *discover.Node) //刪除節點
SubscribeEvents(ch chan *PeerEvent) //訂閱內部的事件(節點的增加,刪除)
//以上四個屬於對外的介面,不影響內部邏輯
Start() //server開始工作
SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) //啟動一個連線,經過兩次驗證之後,如果通過則加入到peer之中。
Start初始化
Start做了三件事,生成路由表於建立底層網路。生成DialState用於驅動維護本地peer的更新與死亡,監聽本地介面用於資訊應答。這裡主要分析peer的維護過程。函式是run函式。
func(srv *Server)Start()(err error) {
//**************初始化程式碼省略
if !srv.NoDiscovery && srv.DiscoveryV5 {
unhandled = make(chan discover.ReadPacket, 100)
sconn = &sharedUDPConn{conn, unhandled}
}
// node table
if !srv.NoDiscovery {
//路由表生成
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
AnnounceAddr: realaddr,
NodeDBPath: srv.NodeDatabase,
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
}
ntab, err := discover.ListenUDP(conn, cfg)
if err != nil {
return err
}
srv.ntab = ntab
}
if srv.DiscoveryV5 {
//路由表生成
var (
ntab *discv5.Network
err error
)
if sconn != nil {
ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
} else {
ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
}
if err != nil {
return err
}
if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
return err
}
srv.DiscV5 = ntab
}
dynPeers := srv.maxDialedConns()
//newDialState 物件生成,這個物件包含Peer的實際維護程式碼
dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
// handshake 協議載入
srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
for _, p := range srv.Protocols {
srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
}
// listen/dial
//監聽本地埠
if srv.ListenAddr != "" {
if err := srv.startListening(); err != nil {
return err
}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
srv.loopWG.Add(1)
//重要的一句,開個協程,在其中做peer的維護
go srv.run(dialer)
srv.running = true
return nil
}
run 開始peer的生成
該函式中定義了兩個佇列
runningTasks []task //正在執行的任務
queuedTasks []task //尚未執行的任務
定義了三個匿名函式
//從正在執行任務中刪除任務
delTask := func