1. 程式人生 > >深入區塊鏈以太坊原始碼之p2p通訊

深入區塊鏈以太坊原始碼之p2p通訊

一、p2p網路中分為有結構和無結構的網路
	無結構化的:
		這種p2p網路即最普通的,不對結構作特別設計的實現方案。
		優點是結構簡單易於組建,網路區域性區域內個體可任意分佈,
		反正此時網路結構對此也沒有限制;特別是在應對大量新個體加
		入網路和舊個體離開網路(“churn”)時它的表現非常穩定。
		缺點在於在該網路中查詢資料的效率太低,因為沒有預知資訊,
		所以往往需要將查詢請求發遍整個網路(至少大多數個體),
		這會佔用很大一部分網路資源,並大大拖慢網路中其他業務執行。

	結構化的
		這種p2p網路中的個體分佈經過精心設計,主要目的是為了提高查詢資料的效率,
		降低查詢資料帶來的資源消耗。
	以太坊採用了不需要結構化的結構,經過改進的非結構化(比如設計好相鄰個體列表peerSet結構)
網路模型可以滿足需求;

二、分散式hash表(DHT)
儲存資料
(以下只是大致原理,具體的協議實現可能會有差異)
	當某個節點得到了新加入的資料(K/V),它會先計算自己與新資料的 key 之間的“距離”;
然後再計算它所知道的其它節點與這個 key 的距離。
如果計算下來,自己與 key 的距離最小,那麼這個資料就保持在自己這裡。
否則的話,把這個資料轉發給距離最小的節點。
收到資料的另一個節點,也採用上述過程進行處理(遞迴處理)。
獲取資料
(以下只是大致原理,具體的協議實現可能會有差異)
	當某個節點接收到查詢資料的請求(key),它會先計算自己與 key 之間的“距離”;
	然後再計算它所知道的其它節點與這個 key 的距離。
如果計算下來,自己與 key 的距離最小,那麼就在自己這裡找有沒有 key 對應的 value。
有的話就返回 value,沒有的話就報錯。
否則的話,把這個資料轉發給距離最小的節點。
收到資料的另一個節點,也採用上述過程進行處理(遞迴處理)。



三、以太坊中p2p通訊的管理模組ProtocolManager

/geth.go
// Start creates a live P2P node and starts running it.
func (n *Node) Start() error {
	return n.node.Start()
}

/*
	Protocol:容納應用程式所要求的回撥函式等.並通過p2p.Server{}在新連線建立後,將其傳遞給通訊物件peer。
	Node.Start()中首先會建立p2p.Server{},此時Server中的Protocol[]還是空的;
	然後將Node中載入的所有<Service>實現體中的Protocol都收集起來,
	一併交給Server物件,作為Server.Protocols列表;然後啟動Server物件,
	並將Server物件作為引數去逐一啟動每個<Service>實現體。

*/
/node.go
// Start create a live P2P node and starts running it.
func (n *Node) Start() error {
	
	...
	/*
		...
		初始化serverConfig
	*/
	running := &p2p.Server{Config: n.serverConfig}
	...
	// Gather the protocols and start the freshly assembled P2P server
	for _, service := range services {
		running.Protocols = append(running.Protocols, service.Protocols()...)
	}
	if err := running.Start(); err != nil { //見下面的(srv *Server)Start方法
		return convertFileLockError(err)
	}
	// Start each of the services
	started := []reflect.Type{}
	for kind, service := range services {
		// Start the next service, stopping all previous upon failure
		//啟動每個services通過下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
		if err := service.Start(running); err != nil {
			for _, kind := range started {
				services[kind].Stop()
			}
			running.Stop()

			return err
		}
		...
	}
}


// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
	srv.lock.Lock()
	//srv.lock為了避免多執行緒重複啟動
	defer srv.lock.Unlock()
	if srv.running {
		return errors.New("server already running")
	}
	srv.running = true
	srv.log = srv.Config.Logger
	if srv.log == nil {
		srv.log = log.New()
	}
	if srv.NoDial && srv.ListenAddr == "" {
		srv.log.Warn("P2P server will be useless, neither dialing nor listening")
	}

	// static fields
	if srv.PrivateKey == nil {
		return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
	}
	//newTransport使用了newRLPX使用了rlpx.go中的網路協議。
	if srv.newTransport == nil {
		srv.newTransport = newRLPX
	}

	if srv.Dialer == nil {
		srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
	}
	srv.quit = make(chan struct{})
	srv.addpeer = make(chan *conn)
	srv.delpeer = make(chan peerDrop)
	srv.posthandshake = make(chan *conn)
	srv.addstatic = make(chan *enode.Node)
	srv.removestatic = make(chan *enode.Node)
	srv.addtrusted = make(chan *enode.Node)
	srv.removetrusted = make(chan *enode.Node)
	srv.peerOp = make(chan peerOpFunc)
	srv.peerOpDone = make(chan struct{})
	//srv.setupLocalNode()這裡主要執行握手
	if err := srv.setupLocalNode(); err != nil {
		return err
	}
	if srv.ListenAddr != "" {
		//監聽TCP埠-->用於業務資料傳輸,基於RLPx協議)
		//在setupListening中有個go srv.listenLoop()去監聽某個埠有無主動發來的IP連線

		if err := srv.setupListening(); err != nil {
			return err
		}
	}
	//偵聽UDP埠(用於結點發現內部會啟動goroutine)
	if err := srv.setupDiscovery(); err != nil {
		return err
	}

	dynPeers := srv.maxDialedConns()
	dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
	srv.loopWG.Add(1)
	// 啟動新執行緒發起TCP連線請求

	//在run()函式中,監聽srv.addpeer通道有沒有資訊如果有遠端peer發來連線請求,
	//則呼叫Server.newPeer()生成新的peer物件,並把Server.Protocols全交給peer。
	/*
	case c := <-srv.addpeer:
		err := srv.protoHandshakeChecks(peers, inboundCount, c)
		if err == nil {
			// The handshakes are done and it passed all checks.
			p := newPeer(c, srv.Protocols)
		}
	*/
	go srv.run(dialer)
	return nil
}


// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
	// Start the bloom bits servicing goroutines
	s.startBloomHandlers(params.BloomBitsBlocks)

	// Start the RPC service
	s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())

	// Figure out a max peers count based on the server limits
	maxPeers := srvr.MaxPeers
	if s.config.LightServ > 0 {
		if s.config.LightPeers >= srvr.MaxPeers {
			return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
		}
		maxPeers -= s.config.LightPeers
	}
	// Start the networking layer and the light server if requested
	s.protocolManager.Start(maxPeers)
	if s.lesServer != nil {
		s.lesServer.Start(srvr)
	}
	return nil
}

/eth/handler.go

type ProtocolManager struct {
	networkID uint64

	fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
	acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)

	txpool      txPool
	blockchain  *core.BlockChain
	chainconfig *params.ChainConfig
	maxPeers    int

	//Downloader型別成員負責所有向相鄰個體主動發起的同步流程。
	downloader *downloader.Downloader
	//Fetcher型別成員累積所有其他個體傳送來的有關新資料的宣佈訊息,並在自身對照後做出安排
	fetcher    *fetcher.Fetcher
	//用來快取相鄰個體列表,peer{}表示網路中的一個遠端個體。
	peers      *peerSet

	SubProtocols []p2p.Protocol

	eventMux      *event.TypeMux
	txsCh         chan core.NewTxsEvent
	txsSub        event.Subscription
	minedBlockSub *event.TypeMuxSubscription

	
	//通過各種通道(chan)和事件訂閱(subscription)的方式,接收和傳送包括交易和區塊在內的資料更新。
	//當然在應用中,訂閱也往往利用通道來實現事件通知。

	// channels for fetcher, syncer, txsyncLoop
	newPeerCh   chan *peer
	txsyncCh    chan *txsync
	quitSync    chan struct{}
	noMorePeers chan struct{}

	// wait group is used for graceful shutdowns during downloading
	// and processing
	wg sync.WaitGroup
}

	Start()函式是ProtocolManager的啟動函式,它會在eth.Ethereum.Start()中被主動呼叫。
ProtocolManager.Start()會啟用4個單獨執行緒(goroutine,協程)去分別執行4個函式,
這也標誌著該以太坊個體p2p通訊的全面啟動。


func (pm *ProtocolManager) Start(maxPeers int) {
	pm.maxPeers = maxPeers

	// broadcast transactions
	//廣播交易的通道。 txsCh會作為txpool的TxPreEvent訂閱通道。
	//txpool有了這種訊息會通知給這個txsCh。 廣播交易的goroutine會把這個訊息廣播出去。
	pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
	//訂閱交易資訊
	pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
	
	go pm.txBroadcastLoop()

	//訂閱挖礦訊息。當新的Block被挖出來的時候會產生訊息
	// broadcast mined blocks
	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
	//挖礦廣播 goroutine 當挖出來的時候需要儘快的廣播到網路上面去。
	go pm.minedBroadcastLoop()

	// start sync handlers
	// 同步器負責週期性地與網路同步,下載雜湊和塊以及處理通知處理程式。
	go pm.syncer()
	// txsyncLoop負責每個新連線的初始事務同步。 當新的peer出現時,
	// 轉發所有當前待處理的事務。為了最小化出口頻寬使用,我們一次只發送一個小包。
	go pm.txsyncLoop()
}

//txBroadcastLoop()會在txCh通道的收端持續等待,一旦接收到有關新交易的事件,
	//會立即呼叫BroadcastTx()函式廣播給那些尚無該交易物件的相鄰個體。
//------------------go pm.txBroadcastLoop()-----------------------
func (pm *ProtocolManager) txBroadcastLoop() {
	for {
		select {
		case event := <-pm.txsCh:
			pm.BroadcastTxs(event.Txs)

		// Err() channel will be closed when unsubscribing.
		case <-pm.txsSub.Err():
			return
		}
	}
}


// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
	var txset = make(map[*peer]types.Transactions)

	// Broadcast transactions to a batch of peers not knowing about it
	for _, tx := range txs {
		peers := pm.peers.PeersWithoutTx(tx.Hash())
		for _, peer := range peers {
			txset[peer] = append(txset[peer], tx)
		}
		log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
	}
	// FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
	for peer, txs := range txset {
		peer.AsyncSendTransactions(txs)
	}
}

// AsyncSendTransactions queues list of transactions propagation to a remote
// peer. If the peer's broadcast queue is full, the event is silently dropped.
// 同步Tx給每個相鄰未知Tx的peer
func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
	select {
	case p.queuedTxs <- txs:
		for _, tx := range txs {
			p.knownTxs.Add(tx.Hash())
		}
	default:
		p.Log().Debug("Dropping transaction propagation", "count", len(txs))
	}
}

//------------------go pm.minedBroadcastLoop()-----------------------

// Mined broadcast loop
//挖礦之後的廣播
func (pm *ProtocolManager) minedBroadcastLoop() {
	// automatically stops if unsubscribe
	for obj := range pm.minedBlockSub.Chan() {
		//Data為interface{} ,使用介面斷言的方法將Data轉化為型別NewMinedBlockEvent
		if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
			//BroadcastBlock的第二個引數為true時,會將整個新區塊依次發給相鄰區塊中的一小部分;
			//而當其為false時,僅僅將新區塊的Hash值和Number傳送給所有相鄰列表。
			pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
			pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
		}
	}
}
//---------------------------go pm.syncer()-----------------------------
/*
	syncer()首先啟動fetcher成員,然後進入一個無限迴圈,
	每次迴圈中都會向相鄰peer列表中“最優”的那個peer作一次區塊全鏈同步。
	發起上述同步的理由分兩種:如果有新登記(加入)的相鄰個體,則在整個peer列表數目大於5時,
	發起之;如果沒有新peer到達,則以10s為間隔定時的發起之。
	這裡所謂"最優"指的是peer中所維護區塊鏈的TotalDifficulty(td)最高,
	由於Td是全鏈中從創世塊到最新頭塊的Difficulty值總和,
	所以Td值最高就意味著它的區塊鏈是最新的,跟這樣的peer作區塊全鏈同步,
	顯然改動量是最小的,此即"最優"。
*/
// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
	// Start and ensure cleanup of sync mechanisms
	//啟動 fetcher,輔助同步區塊資料
	pm.fetcher.Start()
	defer pm.fetcher.Stop()
	defer pm.downloader.Terminate()

	// Wait for different events to fire synchronisation operations
	forceSync := time.NewTicker(forceSyncCycle)
	defer forceSync.Stop()

	for {
		select {
		case <-pm.newPeerCh:
			// Make sure we have peers to select from, then sync
			if pm.peers.Len() < minDesiredPeerCount {
				break
			}
			go pm.synchronise(pm.peers.BestPeer())

		case <-forceSync.C:
			// Force a sync even if not enough peers are present
			go pm.synchronise(pm.peers.BestPeer())

		case <-pm.noMorePeers:
			return
		}
	}
}

// Start boots up the announcement based synchroniser, accepting and processing
// hash notifications and block fetches until termination requested.
func (f *Fetcher) Start() {
	go f.loop()
}

//---------------------go.txsyncLoop()---------------------
// txsyncLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
/*當從網路節點同步過來最新的交易資料後,本地也會把新同步下來的交易資料廣播給網路中的其他節點。
*/
func (pm *ProtocolManager) txsyncLoop() {
	var (
		pending = make(map[enode.ID]*txsync)
		sending = false               // whether a send is active
		pack    = new(txsync)         // the pack that is being sent
		done    = make(chan error, 1) // result of the send
	)

	// send starts a sending a pack of transactions from the sync.
	send := func(s *txsync) {
		// Fill pack with transactions up to the target size.
		size := common.StorageSize(0)
		pack.p = s.p
		pack.txs = pack.txs[:0]
		for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
			pack.txs = append(pack.txs, s.txs[i])
			size += s.txs[i].Size()
		}
		// Remove the transactions that will be sent.
		s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
		if len(s.txs) == 0 {
			delete(pending, s.p.ID())
		}
		// Send the pack in the background.
		s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
		sending = true
		go func() { done <- pack.p.SendTransactions(pack.txs) }()
	}

	// pick chooses the next pending sync.
	pick := func() *txsync {
		if len(pending) == 0 {
			return nil
		}
		n := rand.Intn(len(pending)) + 1
		for _, s := range pending {
			if n--; n == 0 {
				return s
			}
		}
		return nil
	}

	for {
		select {
		case s := <-pm.txsyncCh:
			pending[s.p.ID()] = s
			if !sending {
				send(s)
			}
		case err := <-done:
			sending = false
			// Stop tracking peers that cause send failures.
			if err != nil {
				pack.p.Log().Debug("Transaction send failed", "err", err)
				delete(pending, pack.p.ID())
			}
			// Schedule the next send.
			if s := pick(); s != nil {
				send(s)
			}
		case <-pm.quitSync:
			return
		}
	}
}


四個管道主要的功能都是圍繞廣播和同步展開的
(廣播區塊、廣播交易,同步到區塊、同步到交易,再廣播區塊、廣播交易。)



/*
對於peer間通訊而言,除了己方需要主動向對方peer發起通訊(比如Start()中啟動的四個獨立流程)之外,
還需要一種由對方peer主動呼叫的資料傳輸,這種傳輸不僅僅是由對方peer發給己方,
更多的用法是對方peer主動呼叫一個函式讓己方發給它們某些特定資料。這種通訊方式,
在程式碼實現上適合用回撥(callback)來實現。

ProtocolManager.handle()就是這樣一個函式,它會在ProtocolManager物件建立時,
以回撥函式的方式“埋入”每個p2p.Protocol物件中(實現了Protocol.Run()方法)。
之後每當有新peer要與己方建立通訊時,如果對方能夠支援該Protocol,
那麼雙方就可以順利的建立並開始通訊。以下是handle()的基本程式碼:
*/
// handle is the callback invoked to manage the life cycle of an eth peer. When
// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
	// Ignore maxPeers if this is a trusted peer
	if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
		return p2p.DiscTooManyPeers
	}
	p.Log().Debug("Ethereum peer connected", "name", p.Name())

	// Execute the Ethereum handshake
	var (
		genesis = pm.blockchain.Genesis()
		head    = pm.blockchain.CurrentHeader()
		hash    = head.Hash()
		number  = head.Number.Uint64()
		td      = pm.blockchain.GetTd(hash, number)
	)
	//握手,與對方peer溝通己方的區塊鏈狀態
	if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
		p.Log().Debug("Ethereum handshake failed", "err", err)
		return err
	}
	//初始化一個讀寫通道,用以跟對方peer相互資料傳輸。
	if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
		rw.Init(p.version)
	}
	// Register the peer locally
	//註冊對方peer,存入己方peer列表;只有handle()函式退出時,才會將這個peer移除出列表。
	if err := pm.peers.Register(p); err != nil {
		p.Log().Error("Ethereum peer registration failed", "err", err)
		return err
	}
	defer pm.removePeer(p.id)

	//Downloader成員註冊這個新peer;Downloader會自己維護一個相鄰peer列表。
	// Register the peer in the downloader. If the downloader considers it banned, we disconnect
	if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
		return err
	}
	// Propagate existing transactions. new transactions appearing
	// after this will be sent via broadcasts.
	/*
	呼叫syncTransactions(),用當前txpool中新累計的tx物件組裝成一個txsync{}物件,
	推送到內部通道txsyncCh。還記得Start()啟動的四個函式麼? 其中第四項txsyncLoop()
	中用以等待txsync{}資料的通道txsyncCh,正是在這裡被推入txsync{}的。
	*/
	pm.syncTransactions(p)

	// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
	if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
		// Request the peer's DAO fork header for extra-data validation
		if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
			return err
		}
		// Start a timer to disconnect if the peer doesn't reply in time
		p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
			p.Log().Debug("Timed out DAO fork-check, dropping")
			pm.removePeer(p.id)
		})
		// Make sure it's cleaned up if the peer dies off
		defer func() {
			if p.forkDrop != nil {
				p.forkDrop.Stop()
				p.forkDrop = nil
			}
		}()
	}
		//在無限迴圈中啟動handleMsg(),當對方peer發出任何msg時,
		//handleMsg()可以捕捉相應型別的訊息並在己方進行處理。
	// main loop. handle incoming messages.
	for {
		if err := pm.handleMsg(p); err != nil {
			p.Log().Debug("Ethereum message handling failed", "err", err)
			return err
		}
	}
}