1. 程式人生 > >以太坊 p2p Server 原理及實現

以太坊 p2p Server 原理及實現

以太坊p2p原理與實現

區塊鏈技術的去中心依賴於底層組網技術,以太坊的底層實現了p2pServer,大約可以分為這樣三層。

  • 底層路由表。封裝了kad路由,節點的資料結構以及計算記錄,節點搜尋,驗證等功能。
  • 中層peer抽象,message開放傳送介面,server對外提供peer檢測,初始化,事件訂閱,peer狀態查詢,啟動,停止等功能
  • 以太坊最上層peer,peerset再封裝,通過協議的Run函式,在中層啟動peer時,獲取peer,最終通過一個迴圈擷取穩定peer,包裝在peerset中使用。

底層路由表

這裡簡化問題僅討論Node Discovery Protocol。 這一層維護了一個buckets桶,總共有17個桶,每個桶有16個節點和10個替換節點。 Node放入時先要計算hash和localNode的距離。再按距離選擇一個桶放進去,取的時候逐個計算target和每個桶中物件的舉例,詳細參考closest函式,後面會貼出來。

距離公式滿足:f(x,y)=256-8*n-map(x[n+1]^y[n+1]) 注:n為相同節點數量 map為一個負相關的對映關係。

簡單來說就是相似越多,值越小。細節參考Node.go的logdist函式。 這裡需要了解演算法Kademlia,

.
├── database.go         //封裝node資料庫相關操作
├── node.go             //節點資料結構
├── ntp.go              //同步時間  
├── table.go            //路由表
├── udp.go              //網路相關操作

其中最重要的就是table物件,table公共方法有:

  • newTable 例項建立
  • Self local節點獲取
  • ReadRandomNodes 隨機讀取幾個節點
  • Close 關閉
  • Resolve 在周邊查詢某個節點
  • Lookup 查詢某個節點的鄰近節點

逐個來分析這些方法:

newTable

  • 1:生成物件例項(獲取資料庫客戶端,LocalNode etc)
    // If no node database was given, use an in-memory one
    db, err := newNodeDB(nodeDBPath, Version, ourID)
    if err != nil {
        return nil, err
    }
    tab := &Table{
        net:        t,
        db:         db,
        self:       NewNode(ourID, ourAddr.IP, uint16
(ourAddr.Port), uint16(ourAddr.Port)), bonding: make(map[NodeID]*bondproc), bondslots: make(chan struct{}, maxBondingPingPongs), refreshReq: make(chan chan struct{}), initDone: make(chan struct{}), closeReq: make(chan struct{}), closed: make(chan struct{}), rand: mrand.New(mrand.NewSource(0)), ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, }
  • 2:載入引導節點,初始化k桶。
    if err := tab.setFallbackNodes(bootnodes); err != nil {
        return nil, err
    }
    for i := 0; i < cap(tab.bondslots); i++ {
        tab.bondslots <- struct{}{}
    }
    for i := range tab.buckets {
        tab.buckets[i] = &bucket{
            ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
        }
    }
  • 3:將節點放入到桶裡,生成一條協程用於重新整理,驗證節點。
    tab.seedRand()
    tab.loadSeedNodes(false)  //載入種子節點
    // Start the background expiration goroutine after loading seeds so that the search for
    // seed nodes also considers older nodes that would otherwise be removed by the
    // expiration.
    tab.db.ensureExpirer()
    go tab.loop()

載入種子節點

    func(tab *Table)loadSeedNodes(bond bool) {
        seeds := tab.db.querySeeds(seedCount, seedMaxAge)
        //資料庫中的種子節點和引導節點合併
        seeds = append(seeds, tab.nursery...) 
        if bond {
            seeds = tab.bondall(seeds)   //節點驗證
        }
        for i := range seeds {
            seed := seeds[i]
            age := log.Lazy{Fn: func()interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
            log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
            tab.add(seed)               //節點入桶
        }
    }

節點入桶,同時也要檢查ip等限制。

    func(tab *Table)add(new *Node) {
        tab.mutex.Lock()
        defer tab.mutex.Unlock()

        b := tab.bucket(new.sha)   //獲取當前節點對應的桶
        if !tab.bumpOrAdd(b, new) {
            // Node is not in table. Add it to the replacement list.
            tab.addReplacement(b, new)
        }
    }

桶的選擇

    func (tab *Table) bucket(sha common.Hash) *bucket {
        d := logdist(tab.self.sha, sha)  //計算hash舉例
        if d <= bucketMinDistance {
            //這裡按演算法來看,只要hash前三位相等就會到第一個buckets
            return tab.buckets[0]
        }
        return tab.buckets[d-bucketMinDistance-1]
    }

Resolve

根據Node的Id查詢Node,先在當前的桶裡面查詢,查詢一遍之後沒找到就在周邊的節點裡面搜尋一遍再找。

    // Resolve searches for a specific node with the given ID.
    // It returns nil if the node could not be found.
    func(tab *Table)Resolve(targetID NodeID) *Node {
        // If the node is present in the local table, no
        // network interaction is required.
        hash := crypto.Keccak256Hash(targetID[:])
        tab.mutex.Lock()
        //查詢最近節點
        cl := tab.closest(hash, 1)
        tab.mutex.Unlock()
        if len(cl.entries) > 0 && cl.entries[0].ID == targetID {
            return cl.entries[0]
        }
        // Otherwise, do a network lookup.
        //不存在 搜尋鄰居節點
        result := tab.Lookup(targetID)
        for _, n := range result {
            if n.ID == targetID {
                return n
            }
        }
        return nil
    }

這裡需要理解的函式是 closest,遍歷所有桶的所有節點,查詢最近的一個

    // closest returns the n nodes in the table that are closest to the
    // given id. The caller must hold tab.mutex.
    func(tab *Table)closest(target common.Hash, nresults int) *nodesByDistance {
        // This is a very wasteful way to find the closest nodes but
        // obviously correct. I believe that tree-based buckets would make
        // this easier to implement efficiently.
        close := &nodesByDistance{target: target}
        for _, b := range tab.buckets {
            for _, n := range b.entries {
                close.push(n, nresults)
            }
        }
        return close
    }

    func(h *nodesByDistance)push(n *Node, maxElems int) {
        ix := sort.Search(len(h.entries), func(i int)bool {
            return distcmp(h.target, h.entries[i].sha, n.sha) > 0
        })
        if len(h.entries) < maxElems {
            h.entries = append(h.entries, n)
        }
        if ix == len(h.entries) {
            // farther away than all nodes we already have.
            // if there was room for it, the node is now the last element.
        } else {
            // slide existing entries down to make room
            // this will overwrite the entry we just appended.
            //近的靠前邊
            copy(h.entries[ix+1:], h.entries[ix:])
            h.entries[ix] = n
        }
    }

ReadRandomNodes

整體思路是先拷貝出來,再逐個桶的抽最上面的一個,剩下空桶移除,剩下的桶合併後,下一輪再抽桶的第一個節點,直到填滿給定資料或者桶全部空掉。最後返回填到數組裡面的數量。

    // ReadRandomNodes fills the given slice with random nodes from the
    // table. It will not write the same node more than once. The nodes in
    // the slice are copies and can be modified by the caller.
    func(tab *Table)ReadRandomNodes(buf []*Node)(n int) {
        if !tab.isInitDone() {
            return 0
        }
        tab.mutex.Lock()
        defer tab.mutex.Unlock()

        // Find all non-empty buckets and get a fresh slice of their entries.
        var buckets [][]*Node
        //拷貝節點
        for _, b := range tab.buckets {
            if len(b.entries) > 0 {
                buckets = append(buckets, b.entries[:])
            }
        }
        if len(buckets) == 0 {
            return 0
        }
        // Shuffle the buckets.
        for i := len(buckets) - 1; i > 0; i-- {
            j := tab.rand.Intn(len(buckets))
            buckets[i], buckets[j] = buckets[j], buckets[i]
        }
        // Move head of each bucket into buf, removing buckets that become empty.
        var i, j int
        for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {
            b := buckets[j]
            buf[i] = &(*b[0])  //取第一個節點
            buckets[j] = b[1:] //移除第一個
            if len(b) == 1 {
                //空桶移除
                buckets = append(buckets[:j], buckets[j+1:]...)  
            }
            if len(buckets) == 0 {
                break          
            }
        }
        return i + 1
    }

Lookup

lookup會要求已知節點查詢鄰居節點,查詢的鄰居節點又遞迴的找它周邊的節點

    for {
        // ask the alpha closest nodes that we haven't asked yet
        for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
            n := result.entries[i]
            if !asked[n.ID] {
                asked[n.ID] = true
                pendingQueries++   
                go func() {
                    // Find potential neighbors to bond with
                    r, err := tab.net.findnode(n.ID, n.addr(), targetID)
                    if err != nil {
                        // Bump the failure counter to detect and evacuate non-bonded entries
                        fails := tab.db.findFails(n.ID) + 1
                        tab.db.updateFindFails(n.ID, fails)
                        log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)

                        if fails >= maxFindnodeFailures {
                            log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
                            tab.delete(n)
                        }
                    }
                    reply <- tab.bondall(r)
                }()
            }
        }
        if pendingQueries == 0 {
            // we have asked all closest nodes, stop the search
            break
        }
        // wait for the next reply
        for _, n := range <-reply {    //此處會阻塞請求
            if n != nil && !seen[n.ID] {
                seen[n.ID] = true
                result.push(n, bucketSize)
            }
        }
        pendingQueries--
    }

桶的維護

桶初始化完成後會進入一個迴圈邏輯,其中通過三個timer控制調整週期。

  • 驗證timer 間隔 10s左右
  • 重新整理timer 間隔 30 min
  • 持久化timer 間隔 30s
    revalidate     = time.NewTimer(tab.nextRevalidateTime())
    refresh        = time.NewTicker(refreshInterval)
    copyNodes      = time.NewTicker(copyNodesInterval)

重新整理邏輯:重新載入種子節點,查詢周邊節點,隨機三個節點,並查詢這三個節點的周圍節點。

    func(tab *Table)doRefresh(done chanstruct{}) {
        defer close(done)

        tab.loadSeedNodes(true)

        tab.lookup(tab.self.ID, false)

        for i := 0; i < 3; i++ {
            var target NodeID
            crand.Read(target[:])
            tab.lookup(target, false)
        }
    }

驗證邏輯:驗證每個桶的最末尾節點,如果該節點通過驗證則放到隊首(驗證過程是本地節點向它傳送ping請求,如果迴應pong則通過)

    last, bi := tab.nodeToRevalidate()  //取最後一個節點
    if last == nil {
        // No non-empty bucket found.
        return
    }

    // Ping the selected node and wait for a pong.
    err := tab.ping(last.ID, last.addr())   //通訊驗證

    tab.mutex.Lock()
    defer tab.mutex.Unlock()
    b := tab.buckets[bi]
    if err == nil {
        // The node responded, move it to the front.
        log.Debug("Revalidated node", "b", bi, "id", last.ID)
        b.bump(last)    //提到隊首
        return
    }

Peer/Server

相關檔案

.
├── dial.go          //封裝一個任務生成處理結構以及三種任務結構中(此處命名不太精確)
├── message.go       //定義一些資料的讀寫介面,以及對外的Send/SendItem函式
├── peer.go          //封裝了Peer 包括訊息讀取  
├── rlpx.go          //內部的握手協議
├── server.go        //初始化,維護Peer網路,還有一些對外的介面

這一層會不斷的從路由中提取節點,提取出來的節點要經過身份驗證,協議檢查之後加入到peer裡面,緊接著如果沒有人使用這個peer,這個peer就會被刪除,再重新選擇一些節點出來繼續這個流程,peer再其中是隨生隨銷,這樣做是為了平均的使用所有的節點,而不是僅僅依賴於特定的幾個節點。因而這裡從Server開始入手分析整個流程

    Peers()                             //peer物件
    PeerCount()                         //peer數量
    AddPeer(node *discover.Node)        //新增節點
    RemovePeer(node *discover.Node)     //刪除節點
    SubscribeEvents(ch chan *PeerEvent) //訂閱內部的事件(節點的增加,刪除)
    //以上四個屬於對外的介面,不影響內部邏輯
    Start()                             //server開始工作
    SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node)  //啟動一個連線,經過兩次驗證之後,如果通過則加入到peer之中。

Start初始化

Start做了三件事,生成路由表於建立底層網路。生成DialState用於驅動維護本地peer的更新與死亡,監聽本地介面用於資訊應答。這裡主要分析peer的維護過程。函式是run函式。

    func(srv *Server)Start()(err error) {
        
        //**************初始化程式碼省略
        if !srv.NoDiscovery && srv.DiscoveryV5 {
            unhandled = make(chan discover.ReadPacket, 100)
            sconn = &sharedUDPConn{conn, unhandled}
        }

        // node table
        if !srv.NoDiscovery {
            //路由表生成
            cfg := discover.Config{
                PrivateKey:   srv.PrivateKey,
                AnnounceAddr: realaddr,
                NodeDBPath:   srv.NodeDatabase,
                NetRestrict:  srv.NetRestrict,
                Bootnodes:    srv.BootstrapNodes,
                Unhandled:    unhandled,
            }
            ntab, err := discover.ListenUDP(conn, cfg)
            if err != nil {
                return err
            }
            srv.ntab = ntab
        }

        if srv.DiscoveryV5 {
            //路由表生成
            var (
                ntab *discv5.Network
                err  error
            )
            if sconn != nil {
                ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
            } else {
                ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
            }
            if err != nil {
                return err
            }
            if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
                return err
            }
            srv.DiscV5 = ntab
        }

        dynPeers := srv.maxDialedConns()
        //newDialState 物件生成,這個物件包含Peer的實際維護程式碼
        dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)

        // handshake  協議載入
        srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
        for _, p := range srv.Protocols {
            srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
        }
        // listen/dial
        //監聽本地埠
        if srv.ListenAddr != "" {
            if err := srv.startListening(); err != nil {
                return err
            }
        }
        if srv.NoDial && srv.ListenAddr == "" {
            srv.log.Warn("P2P server will be useless, neither dialing nor listening")
        }

        srv.loopWG.Add(1)
        //重要的一句,開個協程,在其中做peer的維護
        go srv.run(dialer)
        srv.running = true
        return nil
    }

run 開始peer的生成

該函式中定義了兩個佇列

    runningTasks []task //正在執行的任務
    queuedTasks  []task //尚未執行的任務

定義了三個匿名函式

    //從正在執行任務中刪除任務
	delTask := func