1. 程式人生 > >以太坊之Downloader同步區塊流程

以太坊之Downloader同步區塊流程

隨著以太坊的資料越來越多,同步也越來越慢,使用full sync mode同步的話恐怕得一兩個禮拜也不見得能同步完。以太坊有fast sync mode,找了些文章還不是很明白具體內容,所以嘗試著看懂寫下來,如有錯誤之處歡迎指正。

關於fast sync mode的演算法,是在這篇文章中講述的,看完了也沒看明白為什麼同步的資料會少,速度會快,所以看看原始碼的實現吧

https://github.com/ethereum/go-ethereum/pull/1889

圖示


先大致講下同步流程,以新加入網路的節點A和挖礦節點B為例:

1 兩個節點先hadeshake同步下各自的genesis、td、head等資訊

2 連線成功後,節點B傳送TxMsg訊息把自己的txpool中的tx同步給節點A

    然後各自迴圈監聽對方的訊息

3 節點A此時使用fast sync同步資料,依次傳送GetBlockHeadersMsg、GetBlockBodiesMsg、GetReceiptsMsg、GetNodeDataMsg獲取block header、block body、receipt和state資料

4 節點B對應的返回BlockHeadersMsg、BlockBodiesMsg、ReceiptsMsg、NodeDataMsg

5 節點A收到資料把header和body組成block存入自己的leveldb資料庫,一併存入receipts和state資料

6 節點B挖出block後會向A同步區塊,傳送NewBlockMsg或者NewBlockHashesMsg(取決於節點A位於節點B的節點列表位置),如果是NewBlockMsg,那麼節點A直接驗證完存入本地;如果是NewBlockHashesMsg節點A會交給fetcher去獲取header和body,然後再組織成block存入本地

基本概念

先簡要介紹一下一些基本概念

header:區塊的頭

body:區塊內的所有交易

block:區塊,僅包含區塊頭和body

receipt:合約執行後的結果,log就放在這裡。這個預設是不廣播的。fast sync的過程中會有節點去獲取。正常執行的節點(sync到最新後)收到block後,會執行block內的所有交易,就可以得到這個receipt了

statedb:世界狀態,儲存所有的賬號狀態,使用了MPT樹儲存。跟receipt一樣,也不屬於block的一部分,廣播的時候也不廣播。只在fast sync下,會從其他節點獲取。正常執行的節點收到block執行所有的交易的時候也可以生成。

這幾個資料都是存在節點的leveldb資料庫中,正常情況下廣播的塊是區塊,包含header和body(所有的交易)

開始

先從同步程式碼看起來吧,前面初始化的地方先不講了,直接切入正題:如果執行geth不設定同步模式,預設是fast mode

func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
	
	// Figure out whether to allow fast sync or not
        // 如果是fast sync mode,並且當前blocknum大於0,切換為full sync模式
        // 那麼如果sync了一段時間斷掉之後再重新sync是fast mode還是full mode呢,答案還是fast mode
        // 為什麼呢? 後面看到程式碼的時候再講一下吧
	if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
		log.Warn("Blockchain not empty, fast sync disabled")
		mode = downloader.FullSync
	}
	if mode == downloader.FastSync {
		manager.fastSync = uint32(1)
	}
	// Initiate a sub-protocol for every implemented version we can handle
	manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
	for i, version := range ProtocolVersions {
		// Skip protocol version if incompatible with the mode of operation
		if mode == downloader.FastSync && version < eth63 {
			continue
		}
		// Compatible; initialise the sub-protocol
		version := version // Closure for the run
		manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
			Name:    ProtocolName,
			Version: version,
			Length:  ProtocolLengths[i],
			Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
				peer := manager.newPeer(int(version), p, rw)
				select {
				case manager.newPeerCh <- peer:
					manager.wg.Add(1)
					defer manager.wg.Done()
					return manager.handle(peer)
				case <-manager.quitSync:
					return p2p.DiscQuitting
				}
			},
			NodeInfo: func() interface{} {
				return manager.NodeInfo()
			},
			PeerInfo: func(id discover.NodeID) interface{} {
				if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
					return p.Info()
				}
				return nil
			},
		})
	}
	
	// Construct the different synchronisation mechanisms
        // 初始化downloader
	manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
        // validator: 驗證區塊頭
	validator := func(header *types.Header) error {
		return engine.VerifyHeader(blockchain, header, true)
	}
	heighter := func() uint64 {
		return blockchain.CurrentBlock().NumberU64()
	}
        // 將區塊插入blockchain
	inserter := func(blocks types.Blocks) (int, error) {
		// If fast sync is running, deny importing weird blocks
		if atomic.LoadUint32(&manager.fastSync) == 1 {
			log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
			return 0, nil
		}
		atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
		return manager.blockchain.InsertChain(blocks)
	}
        // 初始化fetcher,把downloader、validator作為fetcher的引數
	manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)

	return manager, nil
}

這裡有新的節點連線成功後,會回撥這個Run函式,然後把peer寫入到manager.newPeerCh這個channel。

之前初始化的時候有個goroutine一直在監聽這個channel的資料

eth/sync.go:

func (pm *ProtocolManager) syncer() {
	...
	for {
		select {
		case <-pm.newPeerCh:
			// Make sure we have peers to select from, then sync
			if pm.peers.Len() < minDesiredPeerCount {
				break
			}
			go pm.synchronise(pm.peers.BestPeer())

		case <-forceSync.C:
			// Force a sync even if not enough peers are present
			go pm.synchronise(pm.peers.BestPeer())

		case <-pm.noMorePeers:
			return
		}
	}
}

syncer()是初始化的時候的一個goroutine,一直監聽是否有新節點加入

開始同步

監聽到了有新節點連線,minDesiredPeerCount是5,意思是有5個連線了再開始sync,BestPeer是td最高的peer,就是從最長鏈的節點開始同步。

如果已經開始同步了又有新節點加入還會再同步嗎,答案是no,後面看程式碼的時候講一下

func (pm *ProtocolManager) synchronise(peer *peer) {
	// Run the sync cycle, and disable fast sync if we've went past the pivot block
	if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
		return
	}
	if atomic.LoadUint32(&pm.fastSync) == 1 {
		log.Info("Fast sync complete, auto disabling")
		atomic.StoreUint32(&pm.fastSync, 0)
	}
	atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
	if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
		// We've completed a sync cycle, notify all peers of new state. This path is
		// essential in star-topology networks where a gateway node needs to notify
		// all its out-of-date peers of the availability of a new block. This failure
		// scenario will most often crop up in private and hackathon networks with
		// degenerate connectivity, but it should be healthy for the mainnet too to
		// more reliably update peers or the local TD state.
		go pm.BroadcastBlock(head, false)
	}
}

再呼叫到eth/downloader/downloader.go

func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
	// 這裡保證每次只有一個sync例項執行,這裡保證了同時只有一個sync在進行
	// Make sure only one goroutine is ever allowed past this point at once
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
		return errBusy
	}
	defer atomic.StoreInt32(&d.synchronising, 0)
        ...
	// Retrieve the origin peer and initiate the downloading process
	p := d.peers.Peer(id)
	if p == nil {
		return errUnknownPeer
	}
	return d.syncWithPeer(p, hash, td)
}
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
	// Look up the sync boundaries: the common ancestor and the target block
	latest, err := d.fetchHeight(p)
	if err != nil {
		return err
	}
	height := latest.Number.Uint64()

	origin, err := d.findAncestor(p, height)
	if err != nil {
		return err
	}
	d.syncStatsLock.Lock()
	if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
		d.syncStatsChainOrigin = origin
	}
	d.syncStatsChainHeight = height
	d.syncStatsLock.Unlock()

	// Ensure our origin point is below any fast sync pivot point
	pivot := uint64(0)
	if d.mode == FastSync {
		if height <= uint64(fsMinFullBlocks) {
                        // 如果對端節點的height小於64,則共同祖先更新為0
			origin = 0
		} else {
                        // 否則更新pivot為對端節點height-64
			pivot = height - uint64(fsMinFullBlocks)
			if pivot <= origin {
                                // 如果pivot小於共同祖先,則更新共同祖先為pivot的前一個
				origin = pivot - 1
			}
		}
	}
	d.committed = 1
	if d.mode == FastSync && pivot != 0 {
		d.committed = 0
	}
	// Initiate the sync using a concurrent header and content retrieval algorithm
        // 更新queue的值從共同祖先+1開始,這個好理解,就是從共同祖先開始sync區塊
	d.queue.Prepare(origin+1, d.mode)
	if d.syncInitHook != nil {
		d.syncInitHook(origin, height)
	}

	fetchers := []func() error{
		func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
		func() error { return d.fetchBodies(origin + 1) },          // Bodies are retrieved during normal and fast sync
		func() error { return d.fetchReceipts(origin + 1) },        // Receipts are retrieved during fast sync
		func() error { return d.processHeaders(origin+1, pivot, td) },
	}
	if d.mode == FastSync {
		fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
	} else if d.mode == FullSync {
		fetchers = append(fetchers, d.processFullSyncContent)
	}
	return d.spawnSync(fetchers)
}

1 fetchHeight(p):從節點p根據head hash(建立節點handshake的時候節點p傳過來的最新的head hash)獲取header,傳送GetBlockHeadersMsg訊息,節點p收到訊息後從自己本地拿到header然後傳送BlockHeaderMsg訊息,當前節點一直等待知道收到header的訊息。

2 findAncestor(p):從節點p尋找共同的祖先header。也是傳送GetBlockHeadersMsg訊息,帶的引數是算出來的從某個高度拿某些數量的headers。拿到headers後從最新的header往前for迴圈,看本地的lightchain是否有這個header,有的話就找到了共同祖先。如果未找到,再從創世快開始二分法查詢是否能找到共同的祖先,然後返回這個共同的祖先origin(是header)

3 拿到最新origin後,更新queue從共同祖先開始sync,呼叫spawnSync開始同步

func (d *Downloader) spawnSync(fetchers []func() error) error {
	errc := make(chan error, len(fetchers))
	d.cancelWg.Add(len(fetchers))
	for _, fn := range fetchers {
		fn := fn
		go func() { defer d.cancelWg.Done(); errc <- fn() }()
	}
	// Wait for the first error, then terminate the others.
	var err error
	for i := 0; i < len(fetchers); i++ {
		if i == len(fetchers)-1 {
			// Close the queue when all fetchers have exited.
			// This will cause the block processor to end when
			// it has processed the queue.
			d.queue.Close()
		}
		if err = <-errc; err != nil {
			break
		}
	}
	d.queue.Close()
	d.Cancel()
	return err
}

這個函式的主要功能是:

1 迴圈執行fetchers,fetchers是上面傳過來的一組函式,fetch header、fetch body、 fetch receipt、process header等

2 然後等待讀取errc channel內容,等待sync完成。注意這裡errc是一個緩衝channel,個數為fetchers的長度,就是會等待fetchers中的每個函式執行完成返回。所以這裡實現了pending的效果,就是一直要等到sync完成才會結束sync

3 如果fast sync的話,fetchers的最後一個函式是processFastSyncContent();full sync模式下最後一個函式是processFullSyncContent()

同步State

state是世界狀態,儲存著所有賬戶的餘額等資訊

func (d *Downloader) processFastSyncContent(latest *types.Header) error {
	// Start syncing state of the reported head block. This should get us most of
	// the state of the pivot block.
	stateSync := d.syncState(latest.Root)
	defer stateSync.Cancel()
	go func() {
		if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
			d.queue.Close() // wake up WaitResults
		}
	}()
	// Figure out the ideal pivot block. Note, that this goalpost may move if the
	// sync takes long enough for the chain head to move significantly.
	pivot := uint64(0)
	if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
		pivot = height - uint64(fsMinFullBlocks)
	}
	// To cater for moving pivot points, track the pivot block and subsequently
	// accumulated download results separately.
	var (
		oldPivot *fetchResult   // Locked in pivot block, might change eventually
		oldTail  []*fetchResult // Downloaded content after the pivot
	)
	for {
		// Wait for the next batch of downloaded data to be available, and if the pivot
		// block became stale, move the goalpost
		results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
		if len(results) == 0 {
			// If pivot sync is done, stop
			if oldPivot == nil {
				return stateSync.Cancel()
			}
			// If sync failed, stop
			select {
			case <-d.cancelCh:
				return stateSync.Cancel()
			default:
			}
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
		if oldPivot != nil {
			results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
		}
		// Split around the pivot block and process the two sides via fast/full sync
		if atomic.LoadInt32(&d.committed) == 0 {
			latest = results[len(results)-1].Header
			if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
				log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
				pivot = height - uint64(fsMinFullBlocks)
			}
		}
		P, beforeP, afterP := splitAroundPivot(pivot, results)
		if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
			return err
		}
		if P != nil {
			// If new pivot block found, cancel old state retrieval and restart
			if oldPivot != P {
				stateSync.Cancel()

				stateSync = d.syncState(P.Header.Root)
				defer stateSync.Cancel()
				go func() {
					if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
						d.queue.Close() // wake up WaitResults
					}
				}()
				oldPivot = P
			}
			// Wait for completion, occasionally checking for pivot staleness
			select {
			case <-stateSync.done:
				if stateSync.err != nil {
					return stateSync.err
				}
				if err := d.commitPivotBlock(P); err != nil {
					return err
				}
				oldPivot = nil

			case <-time.After(time.Second):
				oldTail = afterP
				continue
			}
		}
		// Fast sync done, pivot commit done, full import
		if err := d.importBlockResults(afterP); err != nil {
			return err
		}
	}
}

1 引數latest是剛才從對端節點獲取到的最新的header,

2 syncState函式建立了stateSync結構,然後寫到channel stateSyncStart

3 初始化建立downloader的時候建立的goroutine statefetcher()監聽到此channel有資料就開始進行state sync

func (d *Downloader) stateFetcher() {
	for {
		select {
		case s := <-d.stateSyncStart:			for next := s; next != nil; {
				next = d.runStateSync(next)
			}
		case <-d.stateCh:
			// Ignore state responses while no sync is running.
		case <-d.quitCh:
			return
		}
	}
}

eth/downloader/statesync.go:

runStateSync()建立了goroutine s.run(),內部呼叫loop函式:

func (s *stateSync) loop() (err error) {
	// Listen for new peer events to assign tasks to them
	newPeer := make(chan *peerConnection, 1024)
	peerSub := s.d.peers.SubscribeNewPeers(newPeer)
	defer peerSub.Unsubscribe()
	defer func() {
		cerr := s.commit(true)
		if err == nil {
			err = cerr
		}
	}()

	// Keep assigning new tasks until the sync completes or aborts
	for s.sched.Pending() > 0 {
		if err = s.commit(false); err != nil {
			return err
		}
		s.assignTasks()
		// Tasks assigned, wait for something to happen
		select {
		case <-newPeer:
			// New peer arrived, try to assign it download tasks

		case <-s.cancel:
			return errCancelStateFetch

		case <-s.d.cancelCh:
			return errCancelStateFetch

		case req := <-s.deliver:
			// Response, disconnect or timeout triggered, drop the peer if stalling
			log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
			if len(req.items) <= 2 && !req.dropped && req.timedOut() {
				// 2 items are the minimum requested, if even that times out, we've no use of
				// this peer at the moment.
				log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
				s.d.dropPeer(req.peer.id)
			}
			// Process all the received blobs and check for stale delivery
			if err = s.process(req); err != nil {
				log.Warn("Node data write error", "err", err)
				return err
			}
			req.peer.SetNodeDataIdle(len(req.response))
		}
	}
	return nil
}

這段程式碼以及子函式assignTasks的作用:

1 從所有空閒的peer節點開始獲取state狀態,傳送GetNodeDataMsg資料。

2 對端節點收到此訊息後,返回NodeDataMsg包括一定數量的state資料

3 本節點收到NodeDataMsg後,把資料寫到channel stateCh

4 runStateSync()函式監聽此channel的資料,寫入finished結構內(也是stateReq型別),然後再寫入channel s.deliver中

5 剛才傳送GetNodeDataMsg的函式loop()內在監聽channel s.deliver,拿到資料後呼叫process函式

(由於程式碼太多,暫不貼上來,可以自行到程式碼中根據函式名搜尋即可)

6 procress()函式處理收到的state資料,寫入本地(先寫入memory,後續Commit的時候一起寫入leveldb資料庫)

7 然後設定peer為idle後續繼續請求state資料

同步Headers

上面fetchers中呼叫fetchHeaders同步header

func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
        // 預設skeleton為true,表示先獲取骨架(間隔的headers),然後再從其他節點填充骨架間的headers
        skeleton := true
	getHeaders := func(from uint64) {
		request = time.Now()

		ttl = d.requestTTL()
		timeout.Reset(ttl)

		if skeleton {
			// 看引數MaxHeaderFetch-1(跳躍的距離),獲取的header是跳躍的
			go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
		} else {
			// 獲取骨架完成或者失敗,順序獲取headers
			go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
		}
	}
	// Start pulling the header chain skeleton until all is done
	getHeaders(from)

	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderFetch
                // 從對端節點獲取header後會寫到channel headerCh
		case packet := <-d.headerCh:
			// If the skeleton's finished, pull any remaining head headers directly from the origin
                        // 如果獲取skeleton完成,設定skeleton為false,順序獲取
			if packet.Items() == 0 && skeleton {
				skeleton = false
				getHeaders(from)
				continue
			}
			// If no more headers are inbound, notify the content fetchers and return
			...
			headers := packet.(*headerPack).headers

			// If we received a skeleton batch, resolve internals concurrently
                        // 如果獲取了一些header的骨架,開始填充
			if skeleton {
				filled, proced, err := d.fillHeaderSkeleton(from, headers)
				if err != nil {
					p.log.Debug("Skeleton chain invalid", "err", err)
					return errInvalidChain
				}
				headers = filled[proced:]
				from += uint64(proced)
			}
			// Insert all the new headers and fetch the next batch
			if len(headers) > 0 {
				p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
				select {
				case d.headerProcCh <- headers:
				case <-d.cancelCh:
					return errCancelHeaderFetch
				}
				from += uint64(len(headers))
			}
			getHeaders(from)

		}
	}
}

1 先從當前peer獲取一組有間隔的headers,稱謂骨架

2 然後找其他節點填充headers骨架使連續

3 填充完畢後,headers後寫入channel headerProcCh(下面的處理headers中處理),同時把from賦值為新的from,然後進行下一批headers的獲取

處理Headers

func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderProcessing

		case headers := <-d.headerProcCh:
			// Otherwise split the chunk of headers into batches and process them
			gotHeaders = true

			for len(headers) > 0 {
				// Terminate if something failed in between processing chunks
				select {
				case <-d.cancelCh:
					return errCancelHeaderProcessing
				default:
				}
				// Select the next chunk of headers to import
				limit := maxHeadersProcess
				if limit > len(headers) {
					limit = len(headers)
				}
				chunk := headers[:limit]

				// In case of header only syncing, validate the chunk immediately
				if d.mode == FastSync || d.mode == LightSync {
					// Collect the yet unknown headers to mark them as uncertain
					unknown := make([]*types.Header, 0, len(headers))
					for _, header := range chunk {
						if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
							unknown = append(unknown, header)
						}
					}
					// If we're importing pure headers, verify based on their recentness
					frequency := fsHeaderCheckFrequency
					if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
						frequency = 1
					}
					if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
						// If some headers were inserted, add them too to the rollback list
						if n > 0 {
							rollback = append(rollback, chunk[:n]...)
						}
						log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
						return errInvalidChain
					}
					// All verifications passed, store newly found uncertain headers
					rollback = append(rollback, unknown...)
					if len(rollback) > fsHeaderSafetyNet {
						rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
					}
				}
				// Unless we're doing light chains, schedule the headers for associated content retrieval
				if d.mode == FullSync || d.mode == FastSync {
					// If we've reached the allowed number of pending headers, stall a bit
					for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
						select {
						case <-d.cancelCh:
							return errCancelHeaderProcessing
						case <-time.After(time.Second):
						}
					}
					// Otherwise insert the headers for content retrieval
					inserts := d.queue.Schedule(chunk, origin)
					if len(inserts) != len(chunk) {
						log.Debug("Stale headers")
						return errBadPeer
					}
				}
				headers = headers[limit:]
				origin += uint64(limit)
			}
		}
	}
}

1 收到headers後,如果是fast或者light sync,每1K個header處理,呼叫lightchain.InsertHeaderChain()寫入header到leveldb資料庫

2 然後如果當前是fast或者full sync模式後,d.queue.Schedule(chunk, origin)賦值blockTaskPool/blockTaskQueue和receiptTaskPool/receiptTaskQueue(only fast 模式下),供後續同步body和同步receipt使用

同步Body

上面fetchers中呼叫fetchBodies同步body

func (d *Downloader) fetchBodies(from uint64) error {
	log.Debug("Downloading block bodies", "origin", from)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*bodyPack)
			return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
		}
		expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
		capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
	)
	err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
		d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")

	log.Debug("Block body download terminated", "err", err)
	return err
}
fetchParts的處理邏輯:

1 從pool中取出要同步的body

2 呼叫fetch,也就是呼叫這裡的FetchBodies從節點獲取body,傳送GetBlockBodiesMsg訊息

3 對端節點處理完成後發回訊息BlockBodiesMsg,寫入channel bodyCh

4 收到channel bodyCh的資料後,呼叫deliver函式

所以接著看下deliver函式中d.queue.DeliverReceipts的程式碼

func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
	q.lock.Lock()
	defer q.lock.Unlock()

	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
		if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
			return errInvalidBody
		}
		result.Transactions = txLists[index]
		result.Uncles = uncleLists[index]
		return nil
	}
	return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
}

1 這裡用到了處理header時中賦值的blockTaskPool和blockTaskQueue

2 q.deliver中用到了這裡定義的reconstruct函式,把body存入resultCache中的Transactions和Uncles

3 下面處理Content的時候會用到這個Transactions和Uncles

同步Receipt

上面fetchers中呼叫fetchReceipts同步receipts

func (d *Downloader) fetchReceipts(from uint64) error {
	log.Debug("Downloading transaction receipts", "origin", from)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*receiptPack)
			return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
		}
		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
		capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
	)
	err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")

	log.Debug("Transaction receipt download terminated", "err", err)
	return err
}

程式碼跟fetchBodies中的類似:

1 從pool中取出要同步的receipts

2 呼叫fetch,也就是呼叫這裡的FetchReceipts從節點獲取receipts,傳送GetReceiptsMsg訊息

3 對端節點處理完成後發回訊息ReceiptsMsg,寫入channel receiptCh

4 收到channel receiptCh的資料後,呼叫deliver函式

看下deliver函式中的d.queue.DeliverReceipts函式
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
	q.lock.Lock()
	defer q.lock.Unlock()

	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
		if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
			return errInvalidReceipt
		}
		result.Receipts = receiptList[index]
		return nil
	}
	return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
}

1 這裡用到了處理header中的receiptTaskPool和receiptTaskQueue

2 q.deliver中用到了這裡定義的reconstruct函式,把receipts存入resultCache中的Receipts

3 後續處理Content的時候拿到Receipts然後寫入本地leveldb

處理Content

這裡的作用是把block和receipts寫入本地的leveldb

fast sync模式下呼叫的函式是processFastSyncContent

full sync模式下呼叫的函式是processFullSyncContent

先看下full模式下processFullSyncContent的程式碼:

func (d *Downloader) processFullSyncContent() error {
	for {
		results := d.queue.Results(true)
		if len(results) == 0 {
			return nil
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
		if err := d.importBlockResults(results); err != nil {
			return err
		}
	}
}

1 d.queue.Results()函式從上面的resultCache中拿到儲存的內容

2 improtBlockResults()中根據results的header,body組織成block,然後d.blockchain.InsertChain(blocks)寫block到本地的leveldb

再看下fast sync的processFastSyncContent函式:
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
	// Start syncing state of the reported head block. This should get us most of
	// the state of the pivot block.
	stateSync := d.syncState(latest.Root)
	defer stateSync.Cancel()
	go func() {
		if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
			d.queue.Close() // wake up WaitResults
		}
	}()
	// Figure out the ideal pivot block. Note, that this goalpost may move if the
	// sync takes long enough for the chain head to move significantly.
	pivot := uint64(0)
	if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
		pivot = height - uint64(fsMinFullBlocks)
	}
	// To cater for moving pivot points, track the pivot block and subsequently
	// accumulated download results separately.
	var (
		oldPivot *fetchResult   // Locked in pivot block, might change eventually
		oldTail  []*fetchResult // Downloaded content after the pivot
	)
	for {
		// Wait for the next batch of downloaded data to be available, and if the pivot
		// block became stale, move the goalpost
		results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
		if len(results) == 0 {
			// If pivot sync is done, stop
			if oldPivot == nil {
				return stateSync.Cancel()
			}
			// If sync failed, stop
			select {
			case <-d.cancelCh:
				return stateSync.Cancel()
			default:
			}
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
		if oldPivot != nil {
			results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
		}
		// Split around the pivot block and process the two sides via fast/full sync
		if atomic.LoadInt32(&d.committed) == 0 {
			latest = results[len(results)-1].Header
			if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
				log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
				pivot = height - uint64(fsMinFullBlocks)
			}
		}
		P, beforeP, afterP := splitAroundPivot(pivot, results)
		if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
			return err
		}
		if P != nil {
			// If new pivot block found, cancel old state retrieval and restart
			if oldPivot != P {
				stateSync.Cancel()

				stateSync = d.syncState(P.Header.Root)
				defer stateSync.Cancel()
				go func() {
					if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
						d.queue.Close() // wake up WaitResults
					}
				}()
				oldPivot = P
			}
			// Wait for completion, occasionally checking for pivot staleness
			select {
			case <-stateSync.done:
				if stateSync.err != nil {
					return stateSync.err
				}
				if err := d.commitPivotBlock(P); err != nil {
					return err
				}
				oldPivot = nil

			case <-time.After(time.Second):
				oldTail = afterP
				continue
			}
		}
		// Fast sync done, pivot commit done, full import
		if err := d.importBlockResults(afterP); err != nil {
			return err
		}
	}
}

其中commitFastSyncData就是組織headers、receipts和body到block然後寫入本地leveldb中,呼叫函式:

d.blockchain.InsertReceiptChain(blocks, receipts),連同receipt一起寫入本地

同步結束

再看下上面講的開始sync的地方:

func (pm *ProtocolManager) synchronise(peer *peer) {
	// Run the sync cycle, and disable fast sync if we've went past the pivot block
	if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
		return
	}
	if atomic.LoadUint32(&pm.fastSync) == 1 {
		log.Info("Fast sync complete, auto disabling")
		atomic.StoreUint32(&pm.fastSync, 0)
	}
	atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
	if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
		// We've completed a sync cycle, notify all peers of new state. This path is
		// essential in star-topology networks where a gateway node needs to notify
		// all its out-of-date peers of the availability of a new block. This failure
		// scenario will most often crop up in private and hackathon networks with
		// degenerate connectivity, but it should be healthy for the mainnet too to
		// more reliably update peers or the local TD state.
		go pm.BroadcastBlock(head, false)
	}
}

1 pm.downloader.Synchronise()結束的時候也就sync完成了

2 pm.fastSync置為0,退出fast sync模式了

3 拿出最新的head,廣播自己的最新block,開始到正常工作模式。下面再寫一篇文章講解下同步完成之後的工作

總結

1 節點同步的時候預設是fast同步模式

2 程式碼中用到了大量的go和channel的讀寫,實現非同步通訊,這也是go語言的優勢所在,很方便

3 fast 同步過程中從td最高的節點獲取header、body、receipt和state然後寫入本地的leveldb資料庫中

4 同步到最新之後退出fast sync模式

Fast模式快的原因是,直接同步了state資料和receipt資料,取代比較耗時的執行交易的過程,所以同步要比full mode要快。原則上fast sync同步的資料要比full sync同步的資料要多,這篇文章(https://github.com/ethereum/go-ethereum/pull/1889)寫到的最終fast sync資料比full sync的資料少,應該是執行交易的過程中生成的資料要比最終使用的多一點