1. 程式人生 > >以太坊P2P協議

以太坊P2P協議

建立P2P server

func (n *Node) Start() error {
    ...

    // Initialize the p2p server. This creates the node key and
    // discovery databases.
    n.serverConfig = n.config.P2P
    n.serverConfig.PrivateKey = n.config.NodeKey()
    n.serverConfig.Name = n.config.NodeName()
    n.serverConfig.Logger = n.log
    if
n.serverConfig.StaticNodes == nil { n.serverConfig.StaticNodes = n.config.StaticNodes() } if n.serverConfig.TrustedNodes == nil { n.serverConfig.TrustedNodes = n.config.TrustedNodes() } if n.serverConfig.NodeDatabase == "" { n.serverConfig.NodeDatabase = n.config.NodeDB() } running := &p2p.Server{Config: n.serverConfig} n.log.Info("Starting peer-to-peer node"
, "instance", n.serverConfig.Name) .... }

程式碼首先做了一些檢查工作:加鎖、判斷結點是否已經執行、檢查datadir是否可以開啟,然後初始化P2P server配置,最後用該配置建立了一個p2p.Server例項。首先初始化Node中的services欄位,然後遍歷serviceFuncs,也就是之前註冊的所有Service的建構函式列表。在建立Service例項之前,先為每個Service建立一個ServiceContext,之前提到過,ServiceContext裡儲存的是從Node繼承過來的一些資訊。接著通過建構函式建立Service例項,然後加入到service這個map中。

建立Service

// Otherwise copy and specialize the P2P configuration
    services := make(map[reflect.Type]Service)
    for _, constructor := range n.serviceFuncs {
        // Create a new context for the particular service
        ctx := &ServiceContext{
            config:         n.config,
            services:       make(map[reflect.Type]Service),
            EventMux:       n.eventmux,
            AccountManager: n.accman,
        }
        for kind, s := range services { // copy needed for threaded access
            ctx.services[kind] = s
        }
        // Construct and save the service
        service, err := constructor(ctx)
        if err != nil {
            return err
        }
        kind := reflect.TypeOf(service)
        if _, exists := services[kind]; exists {
            return &DuplicateServiceError{Kind: kind}
        }
        services[kind] = service
    }

首先初始化Node中的services欄位,然後遍歷serviceFuncs,也就是之前註冊的所有Service的建構函式列表。在建立Service例項之前,先為每個Service建立一個ServiceContext,之前提到過,ServiceContext裡儲存的是從Node繼承過來的一些資訊。接著通過建構函式建立Service例項,然後加入到service這個map中。

啟動P2P server

// Gather the protocols and start the freshly assembled P2P server  
    for _, service := range services {  
        running.Protocols = append(running.Protocols, service.Protocols()...)  
    }  
    if err := running.Start(); err != nil {  
        return convertFileLockError(err)  
    }  

首先把所有Service支援的協議集合到一起,然後呼叫p2p.Server的Start()方法啟動P2P server(程式碼位於p2p/server.go)。P2P server會繫結一個UDP埠和一個TCP埠,埠號是相同的(預設30303)。UDP埠主要用於結點發現,TCP埠主要用於業務資料傳輸,基於RLPx加密傳輸協議。所以具體來說,Start()方法做了以下幾件事情:

  • 偵聽UDP埠:用於結點發現

  • 發起UDP請求獲取結點表:內部會啟動goroutine來完成

  • 偵聽TCP埠:用於業務資料傳輸,基於RLPx協議

  • 發起TCP請求連線到其他結點:也是啟動goroutine完成

// p2p/server.go
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
    srv.lock.Lock()
    defer srv.lock.Unlock()
    if srv.running {
        return errors.New("server already running")
    }
    srv.running = true
    srv.log = srv.Config.Logger
    if srv.log == nil {
        srv.log = log.New()
    }
    srv.log.Info("Starting P2P networking")

    // static fields
    if srv.PrivateKey == nil {
        return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
    }
    if srv.newTransport == nil {
        srv.newTransport = newRLPX
    }
    if srv.Dialer == nil {
        srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
    }
    srv.quit = make(chan struct{})
    srv.addpeer = make(chan *conn)
    srv.delpeer = make(chan peerDrop)
    srv.posthandshake = make(chan *conn)
    srv.addstatic = make(chan *discover.Node)
    srv.removestatic = make(chan *discover.Node)
    srv.peerOp = make(chan peerOpFunc)
    srv.peerOpDone = make(chan struct{})

    var (
        conn      *net.UDPConn
        sconn     *sharedUDPConn
        realaddr  *net.UDPAddr
        unhandled chan discover.ReadPacket
    )

    if !srv.NoDiscovery || srv.DiscoveryV5 {
        addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
        if err != nil {
            return err
        }
        conn, err = net.ListenUDP("udp", addr)
        if err != nil {
            return err
        }
        realaddr = conn.LocalAddr().(*net.UDPAddr)
        if srv.NAT != nil {
            if !realaddr.IP.IsLoopback() {
                go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
            }
            // TODO: react to external IP changes over time.
            if ext, err := srv.NAT.ExternalIP(); err == nil {
                realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
            }
        }
    }

    if !srv.NoDiscovery && srv.DiscoveryV5 {
        unhandled = make(chan discover.ReadPacket, 100)
        sconn = &sharedUDPConn{conn, unhandled}
    }

    // node table
    if !srv.NoDiscovery {
        cfg := discover.Config{
            PrivateKey:   srv.PrivateKey,
            AnnounceAddr: realaddr,
            NodeDBPath:   srv.NodeDatabase,
            NetRestrict:  srv.NetRestrict,
            Bootnodes:    srv.BootstrapNodes,
            Unhandled:    unhandled,
        }
        ntab, err := discover.ListenUDP(conn, cfg)
        if err != nil {
            return err
        }
        srv.ntab = ntab
    }

    if srv.DiscoveryV5 {
        var (
            ntab *discv5.Network
            err  error
        )
        if sconn != nil {
            ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
        } else {
            ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
        }
        if err != nil {
            return err
        }
        if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
            return err
        }
        srv.DiscV5 = ntab
    }

    dynPeers := srv.maxDialedConns()
    dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)

    // handshake
    srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
    for _, p := range srv.Protocols {
        srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
    }
    // listen/dial
    if srv.ListenAddr != "" {
        if err := srv.startListening(); err != nil {
            return err
        }
    }
    if srv.NoDial && srv.ListenAddr == "" {
        srv.log.Warn("P2P server will be useless, neither dialing nor listening")
    }

    srv.loopWG.Add(1)
    go srv.run(dialer)
    srv.running = true
    return nil
}

啟動Service

// Start each of the services
    started := []reflect.Type{}
    for kind, service := range services {
        // Start the next service, stopping all previous upon failure
        if err := service.Start(running); err != nil {
            for _, kind := range started {
                services[kind].Stop()
            }
            running.Stop()

            return err
        }
        // Mark the service started for potential cleanup
        started = append(started, kind)
    }

主要就是依次呼叫每個Service的Start()方法,然後把啟動的Service的型別儲存到started表中。之前提到 Ethereum 作為一個service,被Node註冊進去。Node start的時候會啟動其註冊的所有服務,Ethereum service也是一樣。

ethereum service

ethereum service的初始化

eth/backend.go
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
    if config.SyncMode == downloader.LightSync {
        return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
    }
    if !config.SyncMode.IsValid() {
        return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
    }
    chainDb, err := CreateDB(ctx, config, "chaindata")
    if err != nil {
        return nil, err
    }
    chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
    if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
        return nil, genesisErr
    }
    log.Info("Initialised chain configuration", "config", chainConfig)

    eth := &Ethereum{
        config:         config,
        chainDb:        chainDb,
        chainConfig:    chainConfig,
        eventMux:       ctx.EventMux,
        accountManager: ctx.AccountManager,
        engine:         CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
        shutdownChan:   make(chan bool),
        networkId:      config.NetworkId,
        gasPrice:       config.GasPrice,
        etherbase:      config.Etherbase,
        bloomRequests:  make(chan chan *bloombits.Retrieval),
        bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks),
    }

    log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)

    if !config.SkipBcVersionCheck {
        bcVersion := rawdb.ReadDatabaseVersion(chainDb)
        if bcVersion != core.BlockChainVersion && bcVersion != 0 {
            return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
        }
        rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
    }
    var (
        vmConfig    = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
        cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
    )
    eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
    if err != nil {
        return nil, err
    }
    // Rewind the chain in case of an incompatible config upgrade.
    if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
        log.Warn("Rewinding chain to upgrade configuration", "err", compat)
        eth.blockchain.SetHead(compat.RewindTo)
        rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
    }
    eth.bloomIndexer.Start(eth.blockchain)

    if config.TxPool.Journal != "" {
        config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
    }
    eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)

    if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
        return nil, err
    }
    eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
    eth.miner.SetExtra(makeExtraData(config.ExtraData))

    eth.APIBackend = &EthAPIBackend{eth, nil}
    gpoParams := config.GPO
    if gpoParams.Default == nil {
        gpoParams.Default = config.GasPrice
    }
    eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)

    return eth, nil
}
  • 如果config.SyncMode 是 downloader.LightSync,走的是les/backend.go的初始化方法。
  • chainDb, err := CreateDB(ctx, config, “chaindata”)開啟leveldb,leveldb是eth儲存資料庫。
  • stopDbUpgrade := upgradeDeduplicateData(chainDb) 檢查chainDb版本,如果需要的話,啟動後臺程序進行升級。
  • chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)裝載創世區塊。 根據節點條件判斷是從資料庫裡面讀取,還是從預設配置檔案讀取,還是從自定義配置檔案讀取,或者是從程式碼裡面獲取預設值。並返回區塊鏈的config和創世塊的hash。
  • 裝載Etherum struct的各個成員。eventMux和accountManager 是Node 啟動 eth service的時候傳入的。eventMux可以認為是一個全域性的事件多路複用器,accountManager認為是一個全域性的賬戶管理器。engine建立共識引擎。etherbase 配置此Etherum的主賬號地址。初始化bloomRequests 通道和bloom過濾器。
  • 判斷客戶端版本號和資料庫版本號是否一致
  • eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) 初始化eth的blockchain,也就是eth的區塊鏈
  • eth.blockchain.SetHead(compat.RewindTo) 根據創始區塊設定區塊頭
  • eth.bloomIndexer.Start(eth.blockchain)啟動bloomIndexer
  • eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) 初始化eth 區塊鏈的交易池,儲存本地生產的和P2P網路同步過來的交易。
  • eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb)初始化以太坊協議管理器,用於區塊鏈P2P通訊
  • miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) 初始化礦工
  • eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams) 建立預言最新gasprice的預言機

ethereum service 啟動

func (s *Ethereum) Start(srvr *p2p.Server) error {
    // Start the bloom bits servicing goroutines
    s.startBloomHandlers()

    // Start the RPC service
    s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())

    // Figure out a max peers count based on the server limits
    maxPeers := srvr.MaxPeers
    if s.config.LightServ > 0 {
        if s.config.LightPeers >= srvr.MaxPeers {
            return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
        }
        maxPeers -= s.config.LightPeers
    }
    // Start the networking layer and the light server if requested
    s.protocolManager.Start(maxPeers)
    if s.lesServer != nil {
        s.lesServer.Start(srvr)
    }
    return nil
}

首先啟動bloom過濾器 eth 的net 相關Api 加入RPC 服務。
s.protocolManager.Start(maxPeers) 設定最大同步節點數,並啟動eth P2P通訊。
如果ethereum service 出問題了才會啟動lesServer。

ProtocolManager 以太坊P2P通訊協議管理

ethereum service的初始化 也會呼叫 NewProtocolManager

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
...
    if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
            return nil, err
        }

        ....
}

ProtocolManager 的初始化方法

func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
    // Create the protocol manager with the base fields
    manager := &ProtocolManager{
        networkId:   networkId,
        eventMux:    mux,
        txpool:      txpool,
        blockchain:  blockchain,
        chainconfig: config,
        peers:       newPeerSet(),
        newPeerCh:   make(chan *peer),
        noMorePeers: make(chan struct{}),
        txsyncCh:    make(chan *txsync),
        quitSync:    make(chan struct{}),
    }
    // Figure out whether to allow fast sync or not
    if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
        log.Warn("Blockchain not empty, fast sync disabled")
        mode = downloader.FullSync
    }
    if mode == downloader.FastSync {
        manager.fastSync = uint32(1)
    }
    // Initiate a sub-protocol for every implemented version we can handle
    manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
    for i, version := range ProtocolVersions {
        // Skip protocol version if incompatible with the mode of operation
        if mode == downloader.FastSync && version < eth63 {
            continue
        }
        // Compatible; initialise the sub-protocol
        version := version // Closure for the run
        manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
            Name:    ProtocolName,
            Version: version,
            Length:  ProtocolLengths[i],
            Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
                peer := manager.newPeer(int(version), p, rw)
                select {
                case manager.newPeerCh <- peer:
                    manager.wg.Add(1)
                    defer manager.wg.Done()
                    return manager.handle(peer)
                case <-manager.quitSync:
                    return p2p.DiscQuitting
                }
            },
            NodeInfo: func() interface{} {
                return manager.NodeInfo()
            },
            PeerInfo: func(id discover.NodeID) interface{} {
                if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
                    return p.Info()
                }
                return nil
            },
        })
    }
    if len(manager.SubProtocols) == 0 {
        return nil, errIncompatibleConfig
    }
    // Construct the different synchronisation mechanisms
    manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)

    validator := func(header *types.Header) error {
        return engine.VerifyHeader(blockchain, header, true)
    }
    heighter := func() uint64 {
        return blockchain.CurrentBlock().NumberU64()
    }
    inserter := func(blocks types.Blocks) (int, error) {
        // If fast sync is running, deny importing weird blocks
        if atomic.LoadUint32(&manager.fastSync) == 1 {
            log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
            return 0, nil
        }
        atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
        return manager.blockchain.InsertChain(blocks)
    }
    manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)

    return manager, nil
}
  • peers 為以太坊臨近的同步網路節點,newPeerCh、noMorePeers、txsyncCh、quitSync對應同步的通知
  • manager.SubProtocols 建立以太坊 P2P server 的 通訊協議,通常只有一個值。manager.SubProtocols,在Node start的時候傳給以太坊 P2P server並同時start P2P server。協議裡面三個函式指標(Run、NodeInfo、PeerInfo)非常重要,後面會用到。
  • manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
    建立了一個下載器,從遠端網路節點中獲取hashes和blocks。
  • manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)收集網路其他以太坊節點發過來的同步通知,進行驗證,並做出相應的處理。初始化傳入的幾個引數 都是用於處理同步區塊鏈資料的函式指標

Ethereum service 啟動的時候會同時啟動 ProtocolManager。

ProtocolManager的start()方法:

func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers

// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()

// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()

// start sync handlers
go pm.syncer()
go pm.txsyncLoop()

}

  • 建立一個新交易的訂閱通道,並啟動交易廣播的goroutine
  • 建立一個挖坑的訂閱通道,並啟動
  • pm.syncer() 啟動同步goroutine,定時的和網路其他節點同步,並處理網路節點的相關通知
  • pm.txsyncLoop() 啟動交易同步goroutine,把新的交易均勻的同步給網路節點

ProtocolManager主動向網路節點廣播

ProtocolManager Start()方法裡面的4個goroutine都是處理ProtocolManager向以太坊網路節點進行廣播的。

  • pm.txBroadcastLoop()方法
func (pm *ProtocolManager) txBroadcastLoop() {
    for {
        select {
        case event := <-pm.txsCh:
            pm.BroadcastTxs(event.Txs)

        // Err() channel will be closed when unsubscribing.
        case <-pm.txsSub.Err():
            return
        }
    }
}

core/tx_pool.go 產生新的交易的時候會send self.txCh,這時候會啟用 self.BroadcastTx(event.Tx.Hash(), event.Tx)

func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
    // Broadcast transaction to a batch of peers not knowing about it
    peers := pm.peers.PeersWithoutTx(hash)
    //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
    for _, peer := range peers {
        peer.SendTransactions(types.Transactions{tx})
    }
    log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}

向快取的沒有這個交易hash的網路節點廣播此次交易。

  • pm.minedBroadcastLoop()方法
// Mined broadcast loop
func (self *ProtocolManager) minedBroadcastLoop() {
    // automatically stops if unsubscribe
    for obj := range self.minedBlockSub.Chan() {
        switch ev := obj.Data.(type) {
        case core.NewMinedBlockEvent:
            self.BroadcastBlock(ev.Block, true)  // First propagate block to peers
            self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
        }
    }
}

收到 miner.go 裡面 NewMinedBlockEvent 挖到新區塊的事件通知,啟用self.BroadcastBlock(ev.Block, true)

func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
    hash := block.Hash()
    peers := pm.peers.PeersWithoutBlock(hash)

    // If propagation is requested, send to a subset of the peer
    if propagate {
        // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
        var td *big.Int
        if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
            td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
        } else {
            log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
            return
        }
        // Send the block to a subset of our peers
        transfer := peers[:int(math.Sqrt(float64(len(peers))))]
        for _, peer := range transfer {
            peer.SendNewBlock(block, td)
        }
        log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
        return
    }
    // Otherwise if the block is indeed in out own chain, announce it
    if pm.blockchain.HasBlock(hash, block.NumberU64()) {
        for _, peer := range peers {
            peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
        }
        log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
    }
}

如果propagate為true 向網路節點廣播整個挖到的block,為false 只廣播挖到的區塊的hash值和number值。廣播的區塊還包括這個區塊打包的所有交易。

  • pm.syncer() 方法
func (pm *ProtocolManager) syncer() {
    // Start and ensure cleanup of sync mechanisms
    pm.fetcher.Start()
    defer pm.fetcher.Stop()
    defer pm.downloader.Terminate()

    // Wait for different events to fire synchronisation operations
    forceSync := time.NewTicker(forceSyncCycle)
    defer forceSync.Stop()

    for {
        select {
        case <-pm.newPeerCh:
            // Make sure we have peers to select from, then sync
            if pm.peers.Len() < minDesiredPeerCount {
                break
            }
            go pm.synchronise(pm.peers.BestPeer())

        case <-forceSync.C:
            // Force a sync even if not enough peers are present
            go pm.synchronise(pm.peers.BestPeer())

        case <-pm.noMorePeers:
            return
        }
    }
}

pm.fetcher.Start()啟動 fetcher,輔助同步區塊資料

當P2P server執行 ProtocolManager 的p2p.Protocol 的Run指標的時候會send pm.newPeerCh,這時候選擇最優的網路節點(TD 總難度最大的)啟動pm.synchronise(pm.peers.BestPeer()) goroutine。

  • pm.txsyncLoop()方法
func (pm *ProtocolManager) txsyncLoop() {
    var (
        pending = make(map[discover.NodeID]*txsync)
        sending = false               // whether a send is active
        pack    = new(txsync)         // the pack that is being sent
        done    = make(chan error, 1) // result of the send
    )

    // send starts a sending a pack of transactions from the sync.
    send := func(s *txsync) {
        // Fill pack with transactions up to the target size.
        size := common.StorageSize(0)
        pack.p = s.p
        pack.txs = pack.txs[:0]
        for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
            pack.txs = append(pack.txs, s.txs[i])
            size += s.txs[i].Size()
        }
        // Remove the transactions that will be sent.
        s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
        if len(s.txs) == 0 {
            delete(pending, s.p.ID())
        }
        // Send the pack in the background.
        s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
        sending = true
        go func() { done <- pack.p.SendTransactions(pack.txs) }()
    }

    // pick chooses the next pending sync.
    pick := func() *txsync {
        if len(pending) == 0 {
            return nil
        }
        n := rand.Intn(len(pending)) + 1
        for _, s := range pending {
            if n--; n == 0 {
                return s
            }
        }
        return nil
    }

    for {
        select {
        case s := <-pm.txsyncCh:
            pending[s.p.ID()] = s
            if !sending {
                send(s)
            }
        case err := <-done:
            sending = false
            // Stop tracking peers that cause send failures.
            if err != nil {
                pack.p.Log().Debug("Transaction send failed", "err", err)
                delete(pending, pack.p.ID())
            }
            // Schedule the next send.
            if s := pick(); s != nil {
                send(s)
            }
        case <-pm.quitSync:
            return
        }
    }
}

當從網路節點同步過來最新的交易資料後,本地也會把新同步下來的交易資料廣播給網路中的其他節點。這四個goroutine 基本上就在不停的做廣播區塊、廣播交易,同步到區塊、同步到交易,再廣播區塊、廣播交易。