深入區塊鏈以太坊原始碼之p2p通訊
阿新 • • 發佈:2019-01-29
一、p2p網路中分為有結構和無結構的網路 無結構化的: 這種p2p網路即最普通的,不對結構作特別設計的實現方案。 優點是結構簡單易於組建,網路區域性區域內個體可任意分佈, 反正此時網路結構對此也沒有限制;特別是在應對大量新個體加 入網路和舊個體離開網路(“churn”)時它的表現非常穩定。 缺點在於在該網路中查詢資料的效率太低,因為沒有預知資訊, 所以往往需要將查詢請求發遍整個網路(至少大多數個體), 這會佔用很大一部分網路資源,並大大拖慢網路中其他業務執行。 結構化的 這種p2p網路中的個體分佈經過精心設計,主要目的是為了提高查詢資料的效率, 降低查詢資料帶來的資源消耗。 以太坊採用了不需要結構化的結構,經過改進的非結構化(比如設計好相鄰個體列表peerSet結構) 網路模型可以滿足需求; 二、分散式hash表(DHT) 儲存資料 (以下只是大致原理,具體的協議實現可能會有差異) 當某個節點得到了新加入的資料(K/V),它會先計算自己與新資料的 key 之間的“距離”; 然後再計算它所知道的其它節點與這個 key 的距離。 如果計算下來,自己與 key 的距離最小,那麼這個資料就保持在自己這裡。 否則的話,把這個資料轉發給距離最小的節點。 收到資料的另一個節點,也採用上述過程進行處理(遞迴處理)。 獲取資料 (以下只是大致原理,具體的協議實現可能會有差異) 當某個節點接收到查詢資料的請求(key),它會先計算自己與 key 之間的“距離”; 然後再計算它所知道的其它節點與這個 key 的距離。 如果計算下來,自己與 key 的距離最小,那麼就在自己這裡找有沒有 key 對應的 value。 有的話就返回 value,沒有的話就報錯。 否則的話,把這個資料轉發給距離最小的節點。 收到資料的另一個節點,也採用上述過程進行處理(遞迴處理)。 三、以太坊中p2p通訊的管理模組ProtocolManager /geth.go // Start creates a live P2P node and starts running it. func (n *Node) Start() error { return n.node.Start() } /* Protocol:容納應用程式所要求的回撥函式等.並通過p2p.Server{}在新連線建立後,將其傳遞給通訊物件peer。 Node.Start()中首先會建立p2p.Server{},此時Server中的Protocol[]還是空的; 然後將Node中載入的所有<Service>實現體中的Protocol都收集起來, 一併交給Server物件,作為Server.Protocols列表;然後啟動Server物件, 並將Server物件作為引數去逐一啟動每個<Service>實現體。 */ /node.go // Start create a live P2P node and starts running it. func (n *Node) Start() error { ... /* ... 初始化serverConfig */ running := &p2p.Server{Config: n.serverConfig} ... // Gather the protocols and start the freshly assembled P2P server for _, service := range services { running.Protocols = append(running.Protocols, service.Protocols()...) } if err := running.Start(); err != nil { //見下面的(srv *Server)Start方法 return convertFileLockError(err) } // Start each of the services started := []reflect.Type{} for kind, service := range services { // Start the next service, stopping all previous upon failure //啟動每個services通過下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error { if err := service.Start(running); err != nil { for _, kind := range started { services[kind].Stop() } running.Stop() return err } ... } } // Start starts running the server. // Servers can not be re-used after stopping. func (srv *Server) Start() (err error) { srv.lock.Lock() //srv.lock為了避免多執行緒重複啟動 defer srv.lock.Unlock() if srv.running { return errors.New("server already running") } srv.running = true srv.log = srv.Config.Logger if srv.log == nil { srv.log = log.New() } if srv.NoDial && srv.ListenAddr == "" { srv.log.Warn("P2P server will be useless, neither dialing nor listening") } // static fields if srv.PrivateKey == nil { return fmt.Errorf("Server.PrivateKey must be set to a non-nil key") } //newTransport使用了newRLPX使用了rlpx.go中的網路協議。 if srv.newTransport == nil { srv.newTransport = newRLPX } if srv.Dialer == nil { srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}} } srv.quit = make(chan struct{}) srv.addpeer = make(chan *conn) srv.delpeer = make(chan peerDrop) srv.posthandshake = make(chan *conn) srv.addstatic = make(chan *enode.Node) srv.removestatic = make(chan *enode.Node) srv.addtrusted = make(chan *enode.Node) srv.removetrusted = make(chan *enode.Node) srv.peerOp = make(chan peerOpFunc) srv.peerOpDone = make(chan struct{}) //srv.setupLocalNode()這裡主要執行握手 if err := srv.setupLocalNode(); err != nil { return err } if srv.ListenAddr != "" { //監聽TCP埠-->用於業務資料傳輸,基於RLPx協議) //在setupListening中有個go srv.listenLoop()去監聽某個埠有無主動發來的IP連線 if err := srv.setupListening(); err != nil { return err } } //偵聽UDP埠(用於結點發現內部會啟動goroutine) if err := srv.setupDiscovery(); err != nil { return err } dynPeers := srv.maxDialedConns() dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict) srv.loopWG.Add(1) // 啟動新執行緒發起TCP連線請求 //在run()函式中,監聽srv.addpeer通道有沒有資訊如果有遠端peer發來連線請求, //則呼叫Server.newPeer()生成新的peer物件,並把Server.Protocols全交給peer。 /* case c := <-srv.addpeer: err := srv.protoHandshakeChecks(peers, inboundCount, c) if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) } */ go srv.run(dialer) return nil } // Start implements node.Service, starting all internal goroutines needed by the // Ethereum protocol implementation. func (s *Ethereum) Start(srvr *p2p.Server) error { // Start the bloom bits servicing goroutines s.startBloomHandlers(params.BloomBitsBlocks) // Start the RPC service s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion()) // Figure out a max peers count based on the server limits maxPeers := srvr.MaxPeers if s.config.LightServ > 0 { if s.config.LightPeers >= srvr.MaxPeers { return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers) } maxPeers -= s.config.LightPeers } // Start the networking layer and the light server if requested s.protocolManager.Start(maxPeers) if s.lesServer != nil { s.lesServer.Start(srvr) } return nil } /eth/handler.go type ProtocolManager struct { networkID uint64 fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks) acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) txpool txPool blockchain *core.BlockChain chainconfig *params.ChainConfig maxPeers int //Downloader型別成員負責所有向相鄰個體主動發起的同步流程。 downloader *downloader.Downloader //Fetcher型別成員累積所有其他個體傳送來的有關新資料的宣佈訊息,並在自身對照後做出安排 fetcher *fetcher.Fetcher //用來快取相鄰個體列表,peer{}表示網路中的一個遠端個體。 peers *peerSet SubProtocols []p2p.Protocol eventMux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription minedBlockSub *event.TypeMuxSubscription //通過各種通道(chan)和事件訂閱(subscription)的方式,接收和傳送包括交易和區塊在內的資料更新。 //當然在應用中,訂閱也往往利用通道來實現事件通知。 // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer txsyncCh chan *txsync quitSync chan struct{} noMorePeers chan struct{} // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup } Start()函式是ProtocolManager的啟動函式,它會在eth.Ethereum.Start()中被主動呼叫。 ProtocolManager.Start()會啟用4個單獨執行緒(goroutine,協程)去分別執行4個函式, 這也標誌著該以太坊個體p2p通訊的全面啟動。 func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions //廣播交易的通道。 txsCh會作為txpool的TxPreEvent訂閱通道。 //txpool有了這種訊息會通知給這個txsCh。 廣播交易的goroutine會把這個訊息廣播出去。 pm.txsCh = make(chan core.NewTxsEvent, txChanSize) //訂閱交易資訊 pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() //訂閱挖礦訊息。當新的Block被挖出來的時候會產生訊息 // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) //挖礦廣播 goroutine 當挖出來的時候需要儘快的廣播到網路上面去。 go pm.minedBroadcastLoop() // start sync handlers // 同步器負責週期性地與網路同步,下載雜湊和塊以及處理通知處理程式。 go pm.syncer() // txsyncLoop負責每個新連線的初始事務同步。 當新的peer出現時, // 轉發所有當前待處理的事務。為了最小化出口頻寬使用,我們一次只發送一個小包。 go pm.txsyncLoop() } //txBroadcastLoop()會在txCh通道的收端持續等待,一旦接收到有關新交易的事件, //會立即呼叫BroadcastTx()函式廣播給那些尚無該交易物件的相鄰個體。 //------------------go pm.txBroadcastLoop()----------------------- func (pm *ProtocolManager) txBroadcastLoop() { for { select { case event := <-pm.txsCh: pm.BroadcastTxs(event.Txs) // Err() channel will be closed when unsubscribing. case <-pm.txsSub.Err(): return } } } // BroadcastTxs will propagate a batch of transactions to all peers which are not known to // already have the given transaction. func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { var txset = make(map[*peer]types.Transactions) // Broadcast transactions to a batch of peers not knowing about it for _, tx := range txs { peers := pm.peers.PeersWithoutTx(tx.Hash()) for _, peer := range peers { txset[peer] = append(txset[peer], tx) } log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) } // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for peer, txs := range txset { peer.AsyncSendTransactions(txs) } } // AsyncSendTransactions queues list of transactions propagation to a remote // peer. If the peer's broadcast queue is full, the event is silently dropped. // 同步Tx給每個相鄰未知Tx的peer func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { select { case p.queuedTxs <- txs: for _, tx := range txs { p.knownTxs.Add(tx.Hash()) } default: p.Log().Debug("Dropping transaction propagation", "count", len(txs)) } } //------------------go pm.minedBroadcastLoop()----------------------- // Mined broadcast loop //挖礦之後的廣播 func (pm *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe for obj := range pm.minedBlockSub.Chan() { //Data為interface{} ,使用介面斷言的方法將Data轉化為型別NewMinedBlockEvent if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { //BroadcastBlock的第二個引數為true時,會將整個新區塊依次發給相鄰區塊中的一小部分; //而當其為false時,僅僅將新區塊的Hash值和Number傳送給所有相鄰列表。 pm.BroadcastBlock(ev.Block, true) // First propagate block to peers pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest } } } //---------------------------go pm.syncer()----------------------------- /* syncer()首先啟動fetcher成員,然後進入一個無限迴圈, 每次迴圈中都會向相鄰peer列表中“最優”的那個peer作一次區塊全鏈同步。 發起上述同步的理由分兩種:如果有新登記(加入)的相鄰個體,則在整個peer列表數目大於5時, 發起之;如果沒有新peer到達,則以10s為間隔定時的發起之。 這裡所謂"最優"指的是peer中所維護區塊鏈的TotalDifficulty(td)最高, 由於Td是全鏈中從創世塊到最新頭塊的Difficulty值總和, 所以Td值最高就意味著它的區塊鏈是最新的,跟這樣的peer作區塊全鏈同步, 顯然改動量是最小的,此即"最優"。 */ // syncer is responsible for periodically synchronising with the network, both // downloading hashes and blocks as well as handling the announcement handler. func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms //啟動 fetcher,輔助同步區塊資料 pm.fetcher.Start() defer pm.fetcher.Stop() defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations forceSync := time.NewTicker(forceSyncCycle) defer forceSync.Stop() for { select { case <-pm.newPeerCh: // Make sure we have peers to select from, then sync if pm.peers.Len() < minDesiredPeerCount { break } go pm.synchronise(pm.peers.BestPeer()) case <-forceSync.C: // Force a sync even if not enough peers are present go pm.synchronise(pm.peers.BestPeer()) case <-pm.noMorePeers: return } } } // Start boots up the announcement based synchroniser, accepting and processing // hash notifications and block fetches until termination requested. func (f *Fetcher) Start() { go f.loop() } //---------------------go.txsyncLoop()--------------------- // txsyncLoop takes care of the initial transaction sync for each new // connection. When a new peer appears, we relay all currently pending // transactions. In order to minimise egress bandwidth usage, we send // the transactions in small packs to one peer at a time. /*當從網路節點同步過來最新的交易資料後,本地也會把新同步下來的交易資料廣播給網路中的其他節點。 */ func (pm *ProtocolManager) txsyncLoop() { var ( pending = make(map[enode.ID]*txsync) sending = false // whether a send is active pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send ) // send starts a sending a pack of transactions from the sync. send := func(s *txsync) { // Fill pack with transactions up to the target size. size := common.StorageSize(0) pack.p = s.p pack.txs = pack.txs[:0] for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { pack.txs = append(pack.txs, s.txs[i]) size += s.txs[i].Size() } // Remove the transactions that will be sent. s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] if len(s.txs) == 0 { delete(pending, s.p.ID()) } // Send the pack in the background. s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) sending = true go func() { done <- pack.p.SendTransactions(pack.txs) }() } // pick chooses the next pending sync. pick := func() *txsync { if len(pending) == 0 { return nil } n := rand.Intn(len(pending)) + 1 for _, s := range pending { if n--; n == 0 { return s } } return nil } for { select { case s := <-pm.txsyncCh: pending[s.p.ID()] = s if !sending { send(s) } case err := <-done: sending = false // Stop tracking peers that cause send failures. if err != nil { pack.p.Log().Debug("Transaction send failed", "err", err) delete(pending, pack.p.ID()) } // Schedule the next send. if s := pick(); s != nil { send(s) } case <-pm.quitSync: return } } } 四個管道主要的功能都是圍繞廣播和同步展開的 (廣播區塊、廣播交易,同步到區塊、同步到交易,再廣播區塊、廣播交易。) /* 對於peer間通訊而言,除了己方需要主動向對方peer發起通訊(比如Start()中啟動的四個獨立流程)之外, 還需要一種由對方peer主動呼叫的資料傳輸,這種傳輸不僅僅是由對方peer發給己方, 更多的用法是對方peer主動呼叫一個函式讓己方發給它們某些特定資料。這種通訊方式, 在程式碼實現上適合用回撥(callback)來實現。 ProtocolManager.handle()就是這樣一個函式,它會在ProtocolManager物件建立時, 以回撥函式的方式“埋入”每個p2p.Protocol物件中(實現了Protocol.Run()方法)。 之後每當有新peer要與己方建立通訊時,如果對方能夠支援該Protocol, 那麼雙方就可以順利的建立並開始通訊。以下是handle()的基本程式碼: */ // handle is the callback invoked to manage the life cycle of an eth peer. When // this function terminates, the peer is disconnected. func (pm *ProtocolManager) handle(p *peer) error { // Ignore maxPeers if this is a trusted peer if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { return p2p.DiscTooManyPeers } p.Log().Debug("Ethereum peer connected", "name", p.Name()) // Execute the Ethereum handshake var ( genesis = pm.blockchain.Genesis() head = pm.blockchain.CurrentHeader() hash = head.Hash() number = head.Number.Uint64() td = pm.blockchain.GetTd(hash, number) ) //握手,與對方peer溝通己方的區塊鏈狀態 if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil { p.Log().Debug("Ethereum handshake failed", "err", err) return err } //初始化一個讀寫通道,用以跟對方peer相互資料傳輸。 if rw, ok := p.rw.(*meteredMsgReadWriter); ok { rw.Init(p.version) } // Register the peer locally //註冊對方peer,存入己方peer列表;只有handle()函式退出時,才會將這個peer移除出列表。 if err := pm.peers.Register(p); err != nil { p.Log().Error("Ethereum peer registration failed", "err", err) return err } defer pm.removePeer(p.id) //Downloader成員註冊這個新peer;Downloader會自己維護一個相鄰peer列表。 // Register the peer in the downloader. If the downloader considers it banned, we disconnect if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { return err } // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. /* 呼叫syncTransactions(),用當前txpool中新累計的tx物件組裝成一個txsync{}物件, 推送到內部通道txsyncCh。還記得Start()啟動的四個函式麼? 其中第四項txsyncLoop() 中用以等待txsync{}資料的通道txsyncCh,正是在這裡被推入txsync{}的。 */ pm.syncTransactions(p) // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil { // Request the peer's DAO fork header for extra-data validation if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil { return err } // Start a timer to disconnect if the peer doesn't reply in time p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() { p.Log().Debug("Timed out DAO fork-check, dropping") pm.removePeer(p.id) }) // Make sure it's cleaned up if the peer dies off defer func() { if p.forkDrop != nil { p.forkDrop.Stop() p.forkDrop = nil } }() } //在無限迴圈中啟動handleMsg(),當對方peer發出任何msg時, //handleMsg()可以捕捉相應型別的訊息並在己方進行處理。 // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { p.Log().Debug("Ethereum message handling failed", "err", err) return err } } }