以太坊原始碼分析(11)eth目前的共識演算法pow的整理
阿新 • • 發佈:2019-01-30
### eth共識演算法分析,從本地節點挖到塊開始分析
##### 首先目前生產環境上面,肯定不是以CPU的形式挖礦的,那麼就是`remoteAgent`這種形式,也就是礦機通過網路請求從以太的節點獲取當前節點的出塊任務,然後礦機根據算出符合該塊難度hash值,提交給節點,也就是對應的以下方法.
```func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool { a.mu.Lock() defer a.mu.Unlock()
// Make sure the work submitted is present work := a.work[hash] if work == nil { log.Info("Work submitted but none pending", "hash", hash) return false } // Make sure the Engine solutions is indeed valid result := work.Block.Header() result.Nonce = nonce result.MixDigest = mixDigest
if err := a.engine.VerifySeal(a.chain, result); err != nil { log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) return false } block := work.Block.WithSeal(result)
// Solutions seems to be valid, return to the miner and notify acceptance a.returnCh <- &Result{work, block} delete(a.work, hash)
return true}
```
該方法會校驗提交過來的塊的hash難度,如果是正常的話,則會將該生成的塊寫到管道中,管道接收的方法在/miner/worker.go/Wait方法中
```func (self *worker) wait() { for { mustCommitNewWork := true for result := range self.recv { atomic.AddInt32(&self.atWork, -1)
if result == nil { continue } block := result.Block work := result.Work
// Update the block hash in all logs since it is now available and not when the // receipt/log of individual transactions were created. for _, r := range work.receipts { for _, l := range r.Logs { l.BlockHash = block.Hash() } } for _, log := range work.state.Logs() { log.BlockHash = block.Hash() } stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state) if err != nil { log.Error("Failed writing block to chain", "err", err) continue } // check if canon block and write transactions if stat == core.CanonStatTy { // implicit by posting ChainHeadEvent mustCommitNewWork = false } // Broadcast the block and announce chain insertion event // 通過p2p的形式將塊廣播到連線的節點,走的還是channel self.mux.Post(core.NewMinedBlockEvent{Block: block}) var ( events []interface{} logs = work.state.Logs() ) events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) if stat == core.CanonStatTy { events = append(events, core.ChainHeadEvent{Block: block}) } self.chain.PostChainEvents(events, logs)
// Insert the block into the set of pending ones to wait for confirmations self.unconfirmed.Insert(block.NumberU64(), block.Hash())
if mustCommitNewWork { self.commitNewWork() } } }}
```
這裡傳送了一個新挖到塊的事件,接著跟,呼叫棧是```/geth/main.go/geth --> startNode --> utils.StartNode(stack)--> stack.Start() --> /node/node.go/Start() --> service.Start(running)--> /eth/backend.go/Start() --> /eth/handler.go/Start()
```好了核心邏輯在handler.go/Start()裡面
```func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers
// broadcast transactions // 廣播交易的通道。 txCh會作為txpool的TxPreEvent訂閱通道。txpool有了這種訊息會通知給這個txCh。 廣播交易的goroutine會把這個訊息廣播出去。 pm.txCh = make(chan core.TxPreEvent, txChanSize) // 訂閱的回執 pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop()
// 訂閱挖礦訊息。當新的Block被挖出來的時候會產生訊息。 這個訂閱和上面的那個訂閱採用了兩種不同的模式,這種是標記為Deprecated的訂閱方式。 // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) // 挖礦廣播 goroutine 當挖出來的時候需要儘快的廣播到網路上面去 本地挖出的塊通過這種形式廣播出去 go pm.minedBroadcastLoop() // 同步器負責週期性地與網路同步,下載雜湊和塊以及處理通知處理程式。 // start sync handlers go pm.syncer() // txsyncLoop負責每個新連線的初始事務同步。 當新的peer出現時,我們轉發所有當前待處理的事務。 為了最小化出口頻寬使用,我們一次只發送一個小包。 go pm.txsyncLoop()}
````pm.minedBroadcastLoop()` 裡面就有管道接收到上面post出來的出塊訊息,跟進去將會看到通過p2p網路傳送給節點的邏輯
```// BroadcastBlock will either propagate a block to a subset of it's peers, or// will only announce it's availability (depending what's requested).func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() peers := pm.peers.PeersWithoutBlock(hash)
// If propagation is requested, send to a subset of the peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { peer.SendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) }}```
這裡面會傳送兩種時間,一種是`NewBlockMsg` ,另外一種是`NewBlockHashesMsg`,好了到此本地節點挖到的塊就通過p2p網路的形式開始擴散出去了接著看下一個重要的方法
```// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable// with the ethereum network.func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ networkId: networkId, eventMux: mux, txpool: txpool, blockchain: blockchain, chaindb: chaindb, chainconfig: config, peers: newPeerSet(), newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { log.Warn("Blockchain not empty, fast sync disabled") mode = downloader.FullSync } if mode == downloader.FastSync { manager.fastSync = uint32(1) } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { // Skip protocol version if incompatible with the mode of operation if mode == downloader.FastSync && version < eth63 { continue } // Compatible; initialise the sub-protocol version := version // Closure for the run manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: ProtocolName, Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() return manager.handle(peer) case <-manager.quitSync: return p2p.DiscQuitting } }, NodeInfo: func() interface{} { return manager.NodeInfo() }, PeerInfo: func(id discover.NodeID) interface{} { if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { return p.Info() } return nil }, }) } if len(manager.SubProtocols) == 0 { return nil, errIncompatibleConfig } // downloader是負責從其他的peer來同步自身資料。 // downloader是全鏈同步工具 // Construct the different synchronisation mechanisms manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) } heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() } inserter := func(blocks types.Blocks) (int, error) { // If fast sync is running, deny importing weird blocks if atomic.LoadUint32(&manager.fastSync) == 1 { log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) return 0, nil } // 設定開始接收交易 atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import return manager.blockchain.InsertChain(blocks) } // 生成一個fetcher // Fetcher負責積累來自各個peer的區塊通知並安排進行檢索。 manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
return manager, nil}
```該方法是用來管理以太坊協議下的多個子協議,其中的`Run` 方法在每個節點啟動的時候就會呼叫,可以看到是阻塞的,跟進`handler`方法能看到這樣的一塊關鍵程式碼
```for { if err := pm.handleMsg(p); err != nil { p.Log().Debug("Ethereum message handling failed", "err", err) return err } }
```
死迴圈,處理p2p網路過來的訊息,接著看`handleMsg`方法
```func (pm *ProtocolManager) handleMsg(p *peer) error { // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { return err } if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } defer msg.Discard()
// Handle the message depending on its contents switch { case msg.Code == StatusMsg: // Status messages should never arrive after the handshake return errResp(ErrExtraStatusMsg, "uncontrolled status message")
// Block header query, collect the requested headers and reply case msg.Code == GetBlockHeadersMsg: // Decode the complex header query var query getBlockHeadersData if err := msg.Decode(&query); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } hashMode := query.Origin.Hash != (common.Hash{})
// Gather headers until the fetch or network limits is reached var ( bytes common.StorageSize headers []*types.Header unknown bool ) for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { // Retrieve the next header satisfying the query var origin *types.Header if hashMode { origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) } else { origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) } if origin == nil { break } number := origin.Number.Uint64() headers = append(headers, origin) bytes += estHeaderRlpSize
// Advance to the next header of the query switch { case query.Origin.Hash != (common.Hash{}) && query.Reverse: // Hash based traversal towards the genesis block for i := 0; i < int(query.Skip)+1; i++ { if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil { query.Origin.Hash = header.ParentHash number-- } else { unknown = true break } } case query.Origin.Hash != (common.Hash{}) && !query.Reverse: // Hash based traversal towards the leaf block var ( current = origin.Number.Uint64() next = current + query.Skip + 1 ) if next <= current { infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) unknown = true } else { if header := pm.blockchain.GetHeaderByNumber(next); header != nil { if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash { query.Origin.Hash = header.Hash() } else { unknown = true } } else { unknown = true } } case query.Reverse: // Number based traversal towards the genesis block if query.Origin.Number >= query.Skip+1 { query.Origin.Number -= (query.Skip + 1) } else { unknown = true }
case !query.Reverse: // Number based traversal towards the leaf block query.Origin.Number += (query.Skip + 1) } } return p.SendBlockHeaders(headers)
case msg.Code == BlockHeadersMsg: // A batch of headers arrived to one of our previous requests var headers []*types.Header if err := msg.Decode(&headers); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // If no headers were received, but we're expending a DAO fork check, maybe it's that if len(headers) == 0 && p.forkDrop != nil { // Possibly an empty reply to the fork header checks, sanity check TDs verifyDAO := true
// If we already have a DAO header, we can check the peer's TD against it. If // the peer's ahead of this, it too must have a reply to the DAO check if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { verifyDAO = false } } // If we're seemingly on the same chain, disable the drop timer if verifyDAO { p.Log().Debug("Seems to be on the same side of the DAO fork") p.forkDrop.Stop() p.forkDrop = nil return nil } } // Filter out any explicitly requested headers, deliver the rest to the downloader filter := len(headers) == 1 if filter { // If it's a potential DAO fork check, validate against the rules if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 { // Disable the fork drop timer p.forkDrop.Stop() p.forkDrop = nil
// Validate the header and either drop the peer or continue if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") return err } p.Log().Debug("Verified to be on the same side of the DAO fork") return nil } // Irrelevant of the fork checks, send the header to the fetcher just in case headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) } if len(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) if err != nil { log.Debug("Failed to deliver headers", "err", err) } }
case msg.Code == GetBlockBodiesMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { return err } // Gather blocks until the fetch or network limits is reached var ( hash common.Hash bytes int bodies []rlp.RawValue ) for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { // Retrieve the hash of the next block if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested block body, stopping if enough was found if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 { bodies = append(bodies, data) bytes += len(data) } } return p.SendBlockBodiesRLP(bodies)
case msg.Code == BlockBodiesMsg: // A batch of block bodies arrived to one of our previous requests var request blockBodiesData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Deliver them all to the downloader for queuing trasactions := make([][]*types.Transaction, len(request)) uncles := make([][]*types.Header, len(request))
for i, body := range request { trasactions[i] = body.Transactions uncles[i] = body.Uncles } // Filter out any explicitly requested bodies, deliver the rest to the downloader filter := len(trasactions) > 0 || len(uncles) > 0 if filter { trasactions, uncles = pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now()) } if len(trasactions) > 0 || len(uncles) > 0 || !filter { err := pm.downloader.DeliverBodies(p.id, trasactions, uncles) if err != nil { log.Debug("Failed to deliver bodies", "err", err) } }
case p.version >= eth63 && msg.Code == GetNodeDataMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { return err } // Gather state data until the fetch or network limits is reached var ( hash common.Hash bytes int data [][]byte ) for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch { // Retrieve the hash of the next state entry if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested state entry, stopping if enough was found if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil { data = append(data, entry) bytes += len(entry) } } return p.SendNodeData(data)
case p.version >= eth63 && msg.Code == NodeDataMsg: // A batch of node state data arrived to one of our previous requests var data [][]byte if err := msg.Decode(&data); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Deliver all to the downloader if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { log.Debug("Failed to deliver node state data", "err", err) }
case p.version >= eth63 && msg.Code == GetReceiptsMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { return err } // Gather state data until the fetch or network limits is reached var ( hash common.Hash bytes int receipts []rlp.RawValue ) for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch { // Retrieve the hash of the next block if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested block's receipts, skipping if unknown to us results := core.GetBlockReceipts(pm.chaindb, hash, core.GetBlockNumber(pm.chaindb, hash)) if results == nil { if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { continue } } // If known, encode and queue for response packet if encoded, err := rlp.EncodeToBytes(results); err != nil { log.Error("Failed to encode receipt", "err", err) } else { receipts = append(receipts, encoded) bytes += len(encoded) } } return p.SendReceiptsRLP(receipts)
case p.version >= eth63 && msg.Code == ReceiptsMsg: // A batch of receipts arrived to one of our previous requests var receipts [][]*types.Receipt if err := msg.Decode(&receipts); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Deliver all to the downloader if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil { log.Debug("Failed to deliver receipts", "err", err) }
case msg.Code == NewBlockHashesMsg: var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } // Mark the hashes as present at the remote node for _, block := range announces { p.MarkBlock(block.Hash) } // Schedule all the unknown hashes for retrieval unknown := make(newBlockHashesData, 0, len(announces)) for _, block := range announces { if !pm.blockchain.HasBlock(block.Hash, block.Number) { unknown = append(unknown, block) } } for _, block := range unknown { pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) }
case msg.Code == NewBlockMsg: // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } request.Block.ReceivedAt = msg.ReceivedAt request.Block.ReceivedFrom = p
// Mark the peer as owning the block and schedule it for import p.MarkBlock(request.Block.Hash()) pm.fetcher.Enqueue(p.id, request.Block)
// Assuming the block is importable by the peer, but possibly not yet done so, // calculate the head hash and TD that the peer truly must have. var ( trueHead = request.Block.ParentHash() trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) ) // Update the peers total difficulty if better than the previous if _, td := p.Head(); trueTD.Cmp(td) > 0 { p.SetHead(trueHead, trueTD)
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of // a singe block (as the true TD is below the propagated block), however this // scenario should easily be covered by the fetcher. currentBlock := pm.blockchain.CurrentBlock() if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { go pm.synchronise(p) } }
case msg.Code == TxMsg: // Transactions arrived, make sure we have a valid and fresh chain to handle them if atomic.LoadUint32(&pm.acceptTxs) == 0 { break } // Transactions can be processed, parse all of them and deliver to the pool var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } for i, tx := range txs { // Validate and mark the remote transaction if tx == nil { return errResp(ErrDecode, "transaction %d is nil", i) } p.MarkTransaction(tx.Hash()) } pm.txpool.AddRemotes(txs)
default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } return nil}```
該方法中就解碼了p2p網路過來的訊息,並且處理了`NewBlockMsg`和`NewBlockHashesMsg`這兩種事件,如`NewBlockMsg`中的處理邏輯是直接通過管道傳送到本地了,`pm.fetcher.Enqueue(p.id, request.Block)`,對應的管道名是:`f.inject`,其中是一個佇列,/fetcher.go/enqueue方法中寫入了一個FIFO佇列中
```func (f *Fetcher) enqueue(peer string, block *types.Block) { hash := block.Hash()
// Ensure the peer isn't DOSing us count := f.queues[peer] + 1 if count > blockLimit { log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) propBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } // Schedule the block for future importing if _, ok := f.queued[hash]; !ok { op := &inject{ origin: peer, block: block, } f.queues[peer] = count f.queued[hash] = op f.queue.Push(op, -float32(block.NumberU64())) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size()) }}
```
該佇列的消費端在/fetcher.go/loop中,是一個死迴圈,核心程式碼
```for !f.queue.Empty() { op := f.queue.PopItem().(*inject) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), false) } // If too high up the chain or phase, continue later number := op.block.NumberU64() if number > height+1 { f.queue.Push(op, -float32(op.block.NumberU64())) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } break } // Otherwise if fresh and still unknown, try and import hash := op.block.Hash() if number+maxUncleDist < height || f.getBlock(hash) != nil { f.forgetBlock(hash) continue } f.insert(op.origin, op.block) }
```
從佇列中取出,接著看`insert`方法
```func (f *Fetcher) insert(peer string, block *types.Block) { hash := block.Hash()
// Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) return } // Quickly validate the header and propagate the block if it passes switch err := f.verifyHeader(block.Header()); err { case nil: // All ok, quickly propagate to our peers propBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true)
case consensus.ErrFutureBlock: // Weird future block, don't fail, but neither propagate
default: // Something went very wrong, drop the peer log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) f.dropPeer(peer) return } // Run the actual import and log any issues if _, err := f.insertChain(types.Blocks{block}); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } // If import succeeded, broadcast the block propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false)
// Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(block) } }()}
```
可以看到,該方法會呼叫`verifyHeader`方法去校驗區塊,如果沒問題的話就通過p2p的形式廣播出去,然後呼叫`insertChain`方法插入到本地的leveldb中,插入沒問題的話,會再廣播一次,不過這次只會廣播block的hash,如此,通過一個對等網路,只要塊合法,那麼就會被全網採納,其中的`verifyHeader`,`insertChain`方法都是在`/handler.go/NewProtocolManager`中定義傳過來的,所有啟動的邏輯都是`handler.go/Start`方法中.fetch.go的start方法在`syncer`方法中用一個單獨的協程觸發的
`/handler.go/handleMsg --> go pm.synchronise(p) --> pm.downloader.Synchronise(peer.id, pHead, pTd, mode) --> d.synchronise(id, head, td, mode)--> d.syncWithPeer(p, hash, td)`,讓我們看下核心方法
```func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error if err != nil { d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) } }() if p.version < 62 { return errTooOld }
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode) defer func(start time.Time) { log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) }(time.Now())
// Look up the sync boundaries: the common ancestor and the target block latest, err := d.fetchHeight(p) if err != nil { return err } height := latest.Number.Uint64()
origin, err := d.findAncestor(p, height) if err != nil { return err } d.syncStatsLock.Lock() if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { d.syncStatsChainOrigin = origin } d.syncStatsChainHeight = height d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent header and content retrieval algorithm pivot := uint64(0) switch d.mode { case LightSync: pivot = height case FastSync: // Calculate the new fast/slow sync pivot point if d.fsPivotLock == nil { pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) if err != nil { panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) } if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() } } else { // Pivot point locked in, use this and do not pick a new one! pivot = d.fsPivotLock.Number.Uint64() } // If the point is below the origin, move origin back to ensure state download if pivot < origin { if pivot > 0 { origin = pivot - 1 } else { origin = 0 } } log.Debug("Fast syncing until pivot block", "pivot", pivot) } d.queue.Prepare(origin+1, d.mode, pivot, latest) if d.syncInitHook != nil { d.syncInitHook(origin, height) }
fetchers := []func() error{ func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync func() error { return d.processHeaders(origin+1, td) }, } if d.mode == FastSync { fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) } else if d.mode == FullSync { fetchers = append(fetchers, d.processFullSyncContent) } err = d.spawnSync(fetchers) if err != nil && d.mode == FastSync && d.fsPivotLock != nil { // If sync failed in the critical section, bump the fail counter. atomic.AddUint32(&d.fsPivotFails, 1) } return err}```
由於上述整個呼叫棧是在`newBlockMsg`的條件中觸發的,這裡的`StartEvent`會通過通道的形式傳遞到miner.go/update中```func (self *Miner) update() { events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})out: for ev := range events.Chan() { switch ev.Data.(type) { case downloader.StartEvent: atomic.StoreInt32(&self.canStart, 0) if self.Mining() { self.Stop() atomic.StoreInt32(&self.shouldStart, 1) log.Info("Mining aborted due to sync") } case downloader.DoneEvent, downloader.FailedEvent: shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
atomic.StoreInt32(&self.canStart, 1) atomic.StoreInt32(&self.shouldStart, 0) if shouldStart { self.Start(self.coinbase) } // unsubscribe. we're only interested in this event once events.Unsubscribe() // stop immediately and ignore all further pending events break out } }}
```
可以看到接收到這個`StartEvent`就會通知所有的代理,呼叫`stop`停止當前相同塊的挖礦,`remote_Agent`中的`stop`方法
最後再看一下新塊如何廣播給其他節點的,處理的方法在`/eth/handle.go/BroadcastBlock`
```func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() peers := pm.peers.PeersWithoutBlock(hash)
// If propagation is requested, send to a subset of the peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { peer.SendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) }}
```
可以看到該方法中迴圈每個連線的peer節點,呼叫`peer.SendNewBlock`傳送產塊訊息過去
```func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { p.knownBlocks.Add(block.Hash()) return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})}
```
```func Send(w MsgWriter, msgcode uint64, data interface{}) error { size, r, err := rlp.EncodeToReader(data) if err != nil { return err } return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})}
```
可以看到通過`writeMsg`寫入該節點裡,該方法的實現是`rw *netWrapper) WriteMsg(msg Msg)`
```func (rw *netWrapper) WriteMsg(msg Msg) error { rw.wmu.Lock() defer rw.wmu.Unlock() rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout)) return rw.wrapped.WriteMsg(msg)}
```
該方法設定了一個超時時間,底層呼叫了net.go的`Write(b []byte) (n int, err error)`,通過網路寫給對應的節點了,然後接收端的方法為`ReadMsg`
```func (pm *ProtocolManager) handleMsg(p *peer) error { // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { return err } if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } defer msg.Discard()
```
可以看到在這邊讀取網路寫入來的訊息,然後根據不同的`msgCode`作不同的處理,由於`handleMsg`是在一個死迴圈中呼叫的,所以就能一直接收到節點廣播過來的訊息
```//eth/handler.gofunc (pm *ProtocolManager) handle(p *peer) error { td, head, genesis := pm.blockchain.Status() p.Handshake(pm.networkId, td, head, genesis)
if rw, ok := p.rw.(*meteredMsgReadWriter); ok { rm.Init(p.version) }
pm.peers.Register(p) defer pm.removePeer(p.id)
pm.downloader.RegisterPeer(p.id, p.version, p)
pm.syncTransactions(p) ... for { if err := pm.handleMsg(p); err != nil { return err } }}
```
handle()函式針對一個新peer做了如下幾件事:握手,與對方peer溝通己方的區塊鏈狀態初始化一個讀寫通道,用以跟對方peer相互資料傳輸。註冊對方peer,存入己方peer列表;只有handle()函式退出時,才會將這個peer移除出列表。Downloader成員註冊這個新peer;Downloader會自己維護一個相鄰peer列表。呼叫syncTransactions(),用當前txpool中新累計的tx物件組裝成一個txsync{}物件,推送到內部通道txsyncCh。還記得Start()啟動的四個函式麼?其中第四項txsyncLoop()中用以等待txsync{}資料的通道txsyncCh,正是在這裡被推入txsync{}的。在無限迴圈中啟動handleMsg(),當對方peer發出任何msg時,handleMsg()可以捕捉相應型別的訊息並在己方進行處理。
##### 首先目前生產環境上面,肯定不是以CPU的形式挖礦的,那麼就是`remoteAgent`這種形式,也就是礦機通過網路請求從以太的節點獲取當前節點的出塊任務,然後礦機根據算出符合該塊難度hash值,提交給節點,也就是對應的以下方法.
```func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool { a.mu.Lock() defer a.mu.Unlock()
// Make sure the work submitted is present work := a.work[hash] if work == nil { log.Info("Work submitted but none pending", "hash", hash) return false } // Make sure the Engine solutions is indeed valid result := work.Block.Header() result.Nonce = nonce result.MixDigest = mixDigest
if err := a.engine.VerifySeal(a.chain, result); err != nil { log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) return false } block := work.Block.WithSeal(result)
// Solutions seems to be valid, return to the miner and notify acceptance a.returnCh <- &Result{work, block} delete(a.work, hash)
return true}
```
該方法會校驗提交過來的塊的hash難度,如果是正常的話,則會將該生成的塊寫到管道中,管道接收的方法在/miner/worker.go/Wait方法中
```func (self *worker) wait() { for { mustCommitNewWork := true for result := range self.recv { atomic.AddInt32(&self.atWork, -1)
if result == nil { continue } block := result.Block work := result.Work
// Update the block hash in all logs since it is now available and not when the // receipt/log of individual transactions were created. for _, r := range work.receipts { for _, l := range r.Logs { l.BlockHash = block.Hash() } } for _, log := range work.state.Logs() { log.BlockHash = block.Hash() } stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state) if err != nil { log.Error("Failed writing block to chain", "err", err) continue } // check if canon block and write transactions if stat == core.CanonStatTy { // implicit by posting ChainHeadEvent mustCommitNewWork = false } // Broadcast the block and announce chain insertion event // 通過p2p的形式將塊廣播到連線的節點,走的還是channel self.mux.Post(core.NewMinedBlockEvent{Block: block}) var ( events []interface{} logs = work.state.Logs() ) events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) if stat == core.CanonStatTy { events = append(events, core.ChainHeadEvent{Block: block}) } self.chain.PostChainEvents(events, logs)
// Insert the block into the set of pending ones to wait for confirmations self.unconfirmed.Insert(block.NumberU64(), block.Hash())
if mustCommitNewWork { self.commitNewWork() } } }}
```
這裡傳送了一個新挖到塊的事件,接著跟,呼叫棧是```/geth/main.go/geth --> startNode --> utils.StartNode(stack)--> stack.Start() --> /node/node.go/Start() --> service.Start(running)--> /eth/backend.go/Start() --> /eth/handler.go/Start()
```好了核心邏輯在handler.go/Start()裡面
```func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers
// broadcast transactions // 廣播交易的通道。 txCh會作為txpool的TxPreEvent訂閱通道。txpool有了這種訊息會通知給這個txCh。 廣播交易的goroutine會把這個訊息廣播出去。 pm.txCh = make(chan core.TxPreEvent, txChanSize) // 訂閱的回執 pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop()
// 訂閱挖礦訊息。當新的Block被挖出來的時候會產生訊息。 這個訂閱和上面的那個訂閱採用了兩種不同的模式,這種是標記為Deprecated的訂閱方式。 // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) // 挖礦廣播 goroutine 當挖出來的時候需要儘快的廣播到網路上面去 本地挖出的塊通過這種形式廣播出去 go pm.minedBroadcastLoop() // 同步器負責週期性地與網路同步,下載雜湊和塊以及處理通知處理程式。 // start sync handlers go pm.syncer() // txsyncLoop負責每個新連線的初始事務同步。 當新的peer出現時,我們轉發所有當前待處理的事務。 為了最小化出口頻寬使用,我們一次只發送一個小包。 go pm.txsyncLoop()}
````pm.minedBroadcastLoop()`
```// BroadcastBlock will either propagate a block to a subset of it's peers, or// will only announce it's availability (depending what's requested).func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() peers := pm.peers.PeersWithoutBlock(hash)
// If propagation is requested, send to a subset of the peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { peer.SendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) }}```
這裡面會傳送兩種時間,一種是`NewBlockMsg`
```// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable// with the ethereum network.func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ networkId: networkId, eventMux: mux, txpool: txpool, blockchain: blockchain, chaindb: chaindb, chainconfig: config, peers: newPeerSet(), newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { log.Warn("Blockchain not empty, fast sync disabled") mode = downloader.FullSync } if mode == downloader.FastSync { manager.fastSync = uint32(1) } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { // Skip protocol version if incompatible with the mode of operation if mode == downloader.FastSync && version < eth63 { continue } // Compatible; initialise the sub-protocol version := version // Closure for the run manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: ProtocolName, Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() return manager.handle(peer) case <-manager.quitSync: return p2p.DiscQuitting } }, NodeInfo: func() interface{} { return manager.NodeInfo() }, PeerInfo: func(id discover.NodeID) interface{} { if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { return p.Info() } return nil }, }) } if len(manager.SubProtocols) == 0 { return nil, errIncompatibleConfig } // downloader是負責從其他的peer來同步自身資料。 // downloader是全鏈同步工具 // Construct the different synchronisation mechanisms manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) } heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() } inserter := func(blocks types.Blocks) (int, error) { // If fast sync is running, deny importing weird blocks if atomic.LoadUint32(&manager.fastSync) == 1 { log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) return 0, nil } // 設定開始接收交易 atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import return manager.blockchain.InsertChain(blocks) } // 生成一個fetcher // Fetcher負責積累來自各個peer的區塊通知並安排進行檢索。 manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
return manager, nil}
```該方法是用來管理以太坊協議下的多個子協議,其中的`Run`
```for { if err := pm.handleMsg(p); err != nil { p.Log().Debug("Ethereum message handling failed", "err", err) return err } }
```
死迴圈,處理p2p網路過來的訊息,接著看`handleMsg`方法
```func (pm *ProtocolManager) handleMsg(p *peer) error { // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { return err } if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } defer msg.Discard()
// Handle the message depending on its contents switch { case msg.Code == StatusMsg: // Status messages should never arrive after the handshake return errResp(ErrExtraStatusMsg, "uncontrolled status message")
// Block header query, collect the requested headers and reply case msg.Code == GetBlockHeadersMsg: // Decode the complex header query var query getBlockHeadersData if err := msg.Decode(&query); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } hashMode := query.Origin.Hash != (common.Hash{})
// Gather headers until the fetch or network limits is reached var ( bytes common.StorageSize headers []*types.Header unknown bool ) for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { // Retrieve the next header satisfying the query var origin *types.Header if hashMode { origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) } else { origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) } if origin == nil { break } number := origin.Number.Uint64() headers = append(headers, origin) bytes += estHeaderRlpSize
// Advance to the next header of the query switch { case query.Origin.Hash != (common.Hash{}) && query.Reverse: // Hash based traversal towards the genesis block for i := 0; i < int(query.Skip)+1; i++ { if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil { query.Origin.Hash = header.ParentHash number-- } else { unknown = true break } } case query.Origin.Hash != (common.Hash{}) && !query.Reverse: // Hash based traversal towards the leaf block var ( current = origin.Number.Uint64() next = current + query.Skip + 1 ) if next <= current { infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) unknown = true } else { if header := pm.blockchain.GetHeaderByNumber(next); header != nil { if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash { query.Origin.Hash = header.Hash() } else { unknown = true } } else { unknown = true } } case query.Reverse: // Number based traversal towards the genesis block if query.Origin.Number >= query.Skip+1 { query.Origin.Number -= (query.Skip + 1) } else { unknown = true }
case !query.Reverse: // Number based traversal towards the leaf block query.Origin.Number += (query.Skip + 1) } } return p.SendBlockHeaders(headers)
case msg.Code == BlockHeadersMsg: // A batch of headers arrived to one of our previous requests var headers []*types.Header if err := msg.Decode(&headers); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // If no headers were received, but we're expending a DAO fork check, maybe it's that if len(headers) == 0 && p.forkDrop != nil { // Possibly an empty reply to the fork header checks, sanity check TDs verifyDAO := true
// If we already have a DAO header, we can check the peer's TD against it. If // the peer's ahead of this, it too must have a reply to the DAO check if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { verifyDAO = false } } // If we're seemingly on the same chain, disable the drop timer if verifyDAO { p.Log().Debug("Seems to be on the same side of the DAO fork") p.forkDrop.Stop() p.forkDrop = nil return nil } } // Filter out any explicitly requested headers, deliver the rest to the downloader filter := len(headers) == 1 if filter { // If it's a potential DAO fork check, validate against the rules if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 { // Disable the fork drop timer p.forkDrop.Stop() p.forkDrop = nil
// Validate the header and either drop the peer or continue if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") return err } p.Log().Debug("Verified to be on the same side of the DAO fork") return nil } // Irrelevant of the fork checks, send the header to the fetcher just in case headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) } if len(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) if err != nil { log.Debug("Failed to deliver headers", "err", err) } }
case msg.Code == GetBlockBodiesMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { return err } // Gather blocks until the fetch or network limits is reached var ( hash common.Hash bytes int bodies []rlp.RawValue ) for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { // Retrieve the hash of the next block if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested block body, stopping if enough was found if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 { bodies = append(bodies, data) bytes += len(data) } } return p.SendBlockBodiesRLP(bodies)
case msg.Code == BlockBodiesMsg: // A batch of block bodies arrived to one of our previous requests var request blockBodiesData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Deliver them all to the downloader for queuing trasactions := make([][]*types.Transaction, len(request)) uncles := make([][]*types.Header, len(request))
for i, body := range request { trasactions[i] = body.Transactions uncles[i] = body.Uncles } // Filter out any explicitly requested bodies, deliver the rest to the downloader filter := len(trasactions) > 0 || len(uncles) > 0 if filter { trasactions, uncles = pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now()) } if len(trasactions) > 0 || len(uncles) > 0 || !filter { err := pm.downloader.DeliverBodies(p.id, trasactions, uncles) if err != nil { log.Debug("Failed to deliver bodies", "err", err) } }
case p.version >= eth63 && msg.Code == GetNodeDataMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { return err } // Gather state data until the fetch or network limits is reached var ( hash common.Hash bytes int data [][]byte ) for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch { // Retrieve the hash of the next state entry if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested state entry, stopping if enough was found if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil { data = append(data, entry) bytes += len(entry) } } return p.SendNodeData(data)
case p.version >= eth63 && msg.Code == NodeDataMsg: // A batch of node state data arrived to one of our previous requests var data [][]byte if err := msg.Decode(&data); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Deliver all to the downloader if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { log.Debug("Failed to deliver node state data", "err", err) }
case p.version >= eth63 && msg.Code == GetReceiptsMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { return err } // Gather state data until the fetch or network limits is reached var ( hash common.Hash bytes int receipts []rlp.RawValue ) for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch { // Retrieve the hash of the next block if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested block's receipts, skipping if unknown to us results := core.GetBlockReceipts(pm.chaindb, hash, core.GetBlockNumber(pm.chaindb, hash)) if results == nil { if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { continue } } // If known, encode and queue for response packet if encoded, err := rlp.EncodeToBytes(results); err != nil { log.Error("Failed to encode receipt", "err", err) } else { receipts = append(receipts, encoded) bytes += len(encoded) } } return p.SendReceiptsRLP(receipts)
case p.version >= eth63 && msg.Code == ReceiptsMsg: // A batch of receipts arrived to one of our previous requests var receipts [][]*types.Receipt if err := msg.Decode(&receipts); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Deliver all to the downloader if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil { log.Debug("Failed to deliver receipts", "err", err) }
case msg.Code == NewBlockHashesMsg: var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } // Mark the hashes as present at the remote node for _, block := range announces { p.MarkBlock(block.Hash) } // Schedule all the unknown hashes for retrieval unknown := make(newBlockHashesData, 0, len(announces)) for _, block := range announces { if !pm.blockchain.HasBlock(block.Hash, block.Number) { unknown = append(unknown, block) } } for _, block := range unknown { pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) }
case msg.Code == NewBlockMsg: // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } request.Block.ReceivedAt = msg.ReceivedAt request.Block.ReceivedFrom = p
// Mark the peer as owning the block and schedule it for import p.MarkBlock(request.Block.Hash()) pm.fetcher.Enqueue(p.id, request.Block)
// Assuming the block is importable by the peer, but possibly not yet done so, // calculate the head hash and TD that the peer truly must have. var ( trueHead = request.Block.ParentHash() trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) ) // Update the peers total difficulty if better than the previous if _, td := p.Head(); trueTD.Cmp(td) > 0 { p.SetHead(trueHead, trueTD)
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of // a singe block (as the true TD is below the propagated block), however this // scenario should easily be covered by the fetcher. currentBlock := pm.blockchain.CurrentBlock() if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { go pm.synchronise(p) } }
case msg.Code == TxMsg: // Transactions arrived, make sure we have a valid and fresh chain to handle them if atomic.LoadUint32(&pm.acceptTxs) == 0 { break } // Transactions can be processed, parse all of them and deliver to the pool var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } for i, tx := range txs { // Validate and mark the remote transaction if tx == nil { return errResp(ErrDecode, "transaction %d is nil", i) } p.MarkTransaction(tx.Hash()) } pm.txpool.AddRemotes(txs)
default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } return nil}```
該方法中就解碼了p2p網路過來的訊息,並且處理了`NewBlockMsg`和`NewBlockHashesMsg`這兩種事件,如`NewBlockMsg`中的處理邏輯是直接通過管道傳送到本地了,`pm.fetcher.Enqueue(p.id, request.Block)`,對應的管道名是:`f.inject`,其中是一個佇列,/fetcher.go/enqueue方法中寫入了一個FIFO佇列中
```func (f *Fetcher) enqueue(peer string, block *types.Block) { hash := block.Hash()
// Ensure the peer isn't DOSing us count := f.queues[peer] + 1 if count > blockLimit { log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) propBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } // Schedule the block for future importing if _, ok := f.queued[hash]; !ok { op := &inject{ origin: peer, block: block, } f.queues[peer] = count f.queued[hash] = op f.queue.Push(op, -float32(block.NumberU64())) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size()) }}
```
該佇列的消費端在/fetcher.go/loop中,是一個死迴圈,核心程式碼
```for !f.queue.Empty() { op := f.queue.PopItem().(*inject) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), false) } // If too high up the chain or phase, continue later number := op.block.NumberU64() if number > height+1 { f.queue.Push(op, -float32(op.block.NumberU64())) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } break } // Otherwise if fresh and still unknown, try and import hash := op.block.Hash() if number+maxUncleDist < height || f.getBlock(hash) != nil { f.forgetBlock(hash) continue } f.insert(op.origin, op.block) }
```
從佇列中取出,接著看`insert`方法
```func (f *Fetcher) insert(peer string, block *types.Block) { hash := block.Hash()
// Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) return } // Quickly validate the header and propagate the block if it passes switch err := f.verifyHeader(block.Header()); err { case nil: // All ok, quickly propagate to our peers propBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true)
case consensus.ErrFutureBlock: // Weird future block, don't fail, but neither propagate
default: // Something went very wrong, drop the peer log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) f.dropPeer(peer) return } // Run the actual import and log any issues if _, err := f.insertChain(types.Blocks{block}); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } // If import succeeded, broadcast the block propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false)
// Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(block) } }()}
```
可以看到,該方法會呼叫`verifyHeader`方法去校驗區塊,如果沒問題的話就通過p2p的形式廣播出去,然後呼叫`insertChain`方法插入到本地的leveldb中,插入沒問題的話,會再廣播一次,不過這次只會廣播block的hash,如此,通過一個對等網路,只要塊合法,那麼就會被全網採納,其中的`verifyHeader`,`insertChain`方法都是在`/handler.go/NewProtocolManager`中定義傳過來的,所有啟動的邏輯都是`handler.go/Start`方法中.fetch.go的start方法在`syncer`方法中用一個單獨的協程觸發的
`/handler.go/handleMsg --> go pm.synchronise(p) --> pm.downloader.Synchronise(peer.id, pHead, pTd, mode) --> d.synchronise(id, head, td, mode)--> d.syncWithPeer(p, hash, td)`,讓我們看下核心方法
```func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error if err != nil { d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) } }() if p.version < 62 { return errTooOld }
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode) defer func(start time.Time) { log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) }(time.Now())
// Look up the sync boundaries: the common ancestor and the target block latest, err := d.fetchHeight(p) if err != nil { return err } height := latest.Number.Uint64()
origin, err := d.findAncestor(p, height) if err != nil { return err } d.syncStatsLock.Lock() if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { d.syncStatsChainOrigin = origin } d.syncStatsChainHeight = height d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent header and content retrieval algorithm pivot := uint64(0) switch d.mode { case LightSync: pivot = height case FastSync: // Calculate the new fast/slow sync pivot point if d.fsPivotLock == nil { pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) if err != nil { panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) } if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() } } else { // Pivot point locked in, use this and do not pick a new one! pivot = d.fsPivotLock.Number.Uint64() } // If the point is below the origin, move origin back to ensure state download if pivot < origin { if pivot > 0 { origin = pivot - 1 } else { origin = 0 } } log.Debug("Fast syncing until pivot block", "pivot", pivot) } d.queue.Prepare(origin+1, d.mode, pivot, latest) if d.syncInitHook != nil { d.syncInitHook(origin, height) }
fetchers := []func() error{ func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync func() error { return d.processHeaders(origin+1, td) }, } if d.mode == FastSync { fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) } else if d.mode == FullSync { fetchers = append(fetchers, d.processFullSyncContent) } err = d.spawnSync(fetchers) if err != nil && d.mode == FastSync && d.fsPivotLock != nil { // If sync failed in the critical section, bump the fail counter. atomic.AddUint32(&d.fsPivotFails, 1) } return err}```
由於上述整個呼叫棧是在`newBlockMsg`的條件中觸發的,這裡的`StartEvent`會通過通道的形式傳遞到miner.go/update中```func (self *Miner) update() { events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})out: for ev := range events.Chan() { switch ev.Data.(type) { case downloader.StartEvent: atomic.StoreInt32(&self.canStart, 0) if self.Mining() { self.Stop() atomic.StoreInt32(&self.shouldStart, 1) log.Info("Mining aborted due to sync") } case downloader.DoneEvent, downloader.FailedEvent: shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
atomic.StoreInt32(&self.canStart, 1) atomic.StoreInt32(&self.shouldStart, 0) if shouldStart { self.Start(self.coinbase) } // unsubscribe. we're only interested in this event once events.Unsubscribe() // stop immediately and ignore all further pending events break out } }}
```
可以看到接收到這個`StartEvent`就會通知所有的代理,呼叫`stop`停止當前相同塊的挖礦,`remote_Agent`中的`stop`方法
最後再看一下新塊如何廣播給其他節點的,處理的方法在`/eth/handle.go/BroadcastBlock`
```func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() peers := pm.peers.PeersWithoutBlock(hash)
// If propagation is requested, send to a subset of the peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { peer.SendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) }}
```
可以看到該方法中迴圈每個連線的peer節點,呼叫`peer.SendNewBlock`傳送產塊訊息過去
```func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { p.knownBlocks.Add(block.Hash()) return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})}
```
```func Send(w MsgWriter, msgcode uint64, data interface{}) error { size, r, err := rlp.EncodeToReader(data) if err != nil { return err } return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})}
```
可以看到通過`writeMsg`寫入該節點裡,該方法的實現是`rw *netWrapper) WriteMsg(msg Msg)`
```func (rw *netWrapper) WriteMsg(msg Msg) error { rw.wmu.Lock() defer rw.wmu.Unlock() rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout)) return rw.wrapped.WriteMsg(msg)}
```
該方法設定了一個超時時間,底層呼叫了net.go的`Write(b []byte) (n int, err error)`,通過網路寫給對應的節點了,然後接收端的方法為`ReadMsg`
```func (pm *ProtocolManager) handleMsg(p *peer) error { // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { return err } if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } defer msg.Discard()
```
可以看到在這邊讀取網路寫入來的訊息,然後根據不同的`msgCode`作不同的處理,由於`handleMsg`是在一個死迴圈中呼叫的,所以就能一直接收到節點廣播過來的訊息
```//eth/handler.gofunc (pm *ProtocolManager) handle(p *peer) error { td, head, genesis := pm.blockchain.Status() p.Handshake(pm.networkId, td, head, genesis)
if rw, ok := p.rw.(*meteredMsgReadWriter); ok { rm.Init(p.version) }
pm.peers.Register(p) defer pm.removePeer(p.id)
pm.downloader.RegisterPeer(p.id, p.version, p)
pm.syncTransactions(p) ... for { if err := pm.handleMsg(p); err != nil { return err } }}
```
handle()函式針對一個新peer做了如下幾件事:握手,與對方peer溝通己方的區塊鏈狀態初始化一個讀寫通道,用以跟對方peer相互資料傳輸。註冊對方peer,存入己方peer列表;只有handle()函式退出時,才會將這個peer移除出列表。Downloader成員註冊這個新peer;Downloader會自己維護一個相鄰peer列表。呼叫syncTransactions(),用當前txpool中新累計的tx物件組裝成一個txsync{}物件,推送到內部通道txsyncCh。還記得Start()啟動的四個函式麼?其中第四項txsyncLoop()中用以等待txsync{}資料的通道txsyncCh,正是在這裡被推入txsync{}的。在無限迴圈中啟動handleMsg(),當對方peer發出任何msg時,handleMsg()可以捕捉相應型別的訊息並在己方進行處理。