1. 程式人生 > >以太坊原始碼解讀(9)以太坊的P2P模組解析——底層網路構建和啟動

以太坊原始碼解讀(9)以太坊的P2P模組解析——底層網路構建和啟動

以太坊的底層p2pServer,大約可以分為三層:

1、底層:table物件、node物件,它們分別定義了底層的路由表以及本地節點的資料結構、搜尋和驗證;
    1)database.go         //封裝node資料庫相關操作
    2)node.go             //節點資料結構
    3)ntp.go              //同步時間 
    4)table.go            //路由表
    5)udp.go              //網路相關操作

2、中層:peer物件定義了遠端節點、message物件開放傳送介面、server物件則提供peer節點的檢測、初始化、事件訂閱、狀態查詢、啟動和停止等功能;
    1)dial.go          //封裝一個任務生成處理結構以及三種任務結構中(此處命名不太精確)
    2)message.go       //定義一些資料的讀寫介面,以及對外的Send/SendItem函式
    3)peer.go          //封裝了Peer 包括訊息讀取  
    4)rlpx.go          //內部的握手協議
    5)server.go        //初始化,維護Peer網路,還有一些對外的介面

3、頂層:在eth/peer.go中對p2p/peer.go的peer再封裝,包含了對該節點廣播的更多區塊鏈的資訊,如交易、交易hash、區塊以及區塊頭hash等。peer最終會被收集在peerset中使用。
    1)eth/peer.go    // 封裝了peer和peerset兩個結構體以及一些廣播資料的方法
    2)eth/handler.go  // 封裝了很多協議管理工具

一、p2p.Server基本結構

Server用於管理所有的peer連線:

type Server struct {
	// Config包含了所有Server的配置選項,Service第一啟動的時候Config未必初始化
	Config

	// 用於測試的hooks
	newTransport func(net.Conn) transport
	newPeerHook  func(*Peer)

	lock    sync.Mutex // protects running
	running bool

	ntab         discoverTable  // 包括Self()、Close()、Resolve()、Lookup()、ReadRandomNodes()等函式的介面
	listener     net.Listener
	ourHandshake *protoHandshake // 協議握手的RLP結構
	lastLookup   time.Time
	DiscV5       *discv5.Network // 基於V5發現協議的topic-discovery網路

	// 下面是關於peer的操作
	peerOp     chan peerOpFunc
	peerOpDone chan struct{}

	quit          chan struct{}
	addstatic     chan *discover.Node
	removestatic  chan *discover.Node
	addtrusted    chan *discover.Node
	removetrusted chan *discover.Node
	posthandshake chan *conn
	addpeer       chan *conn
	delpeer       chan peerDrop
	loopWG        sync.WaitGroup // loop, listenLoop
	peerFeed      event.Feed
	log           log.Logger
}

再看看Server的配置:

type Config struct {
	
	PrivateKey *ecdsa.PrivateKey `toml:"-"`  // 本地節點的祕鑰
	MaxPeers int  // 可連線的節點最大數值

	// maxpendingpeer是在握手階段中可以掛起的最大對等點數量,分別計算入站連線和出站連線。預設值的預設值為零
	MaxPendingPeers int `toml:",omitempty"`

	// 撥號比率控制入站與撥號連線的比率。例如:撥號比率為2允許1/2的連線被撥號。設定撥號比率為零預設為3。
	DialRatio int `toml:",omitempty"`

	NoDiscovery bool // NoDiscovery 用來禁用節點發現機制,常用於協議debugging
	DiscoveryV5 bool `toml:",omitempty"` // DiscoveryV5 決定了是否啟用topic-discovery協議

	Name string `toml:"-"` // 設定節點名稱
	BootstrapNodes []*discover.Node // BootstrapNodes 用於建立與網路其餘部分的連線。
	BootstrapNodesV5 []*discv5.Node `toml:",omitempty"` // BootstrapNodesV5 用於建立與網路其餘部分的V5連線。

	StaticNodes []*discover.Node // 靜態節點用作預先配置的連線,在斷開連線時總是維護和重新連線。
	TrustedNodes []*discover.Node // 可信節點被用作預先配置的連線,這些連線總是允許連線的,甚至超過對等限制。

	// 連線可以被限制到某些IP網路。如果將此選項設定為非nil值,則只考慮與列表中包含的IP網路之一匹配的主機。
	NetRestrict *netutil.Netlist `toml:",omitempty"`

	// NodeDatabase是到資料庫的路徑,其中包含以前在網路中看到的活動節點
	NodeDatabase string `toml:",omitempty"`

	// 協議應該包含伺服器支援的協議。為每個對等點啟動匹配協議。
	Protocols []Protocol `toml:"-"`

	// 如果ListenAddr設定為非nil地址,伺服器將偵聽傳入的連線。
        // 如果埠為零,作業系統將選擇一個埠。當伺服器啟動時,ListenAddr欄位將更新實際地址。
	ListenAddr string

	// 如果設定為非nil值,則使用給定的NAT埠對映器使偵聽埠對Internet可用。
	NAT nat.Interface `toml:",omitempty"`

	// 如果撥號器設定為非空值,則使用給定的撥號器撥號出站peer連線。
	Dialer NodeDialer `toml:"-"`

	// 如果NoDial是真的,伺服器將不會撥任何節點。
	NoDial bool `toml:",omitempty"`

	// 如果EnableMsgEvents被設定,那麼當訊息傳送到peer或從peer接收到時,伺服器將發出PeerEvents
	EnableMsgEvents bool

	// 日誌記錄器是一個自定義日誌記錄器
	Logger log.Logger `toml:",omitempty"`
}

配置裡包括了服務可連線的節點(靜態節點、信任節點、剩餘節點)、節點發現機制、協議列表(包括ethereum協議)、埠控制。

二、p2p.Server的啟動:sever.Start()

-> geth
 -> startNode  // 首先要啟動節點 
    -> utils.StartNode
    -> Node.Start
       -> eth.Start   // 啟動以太坊物件
          -> protocolManager.start()  // 開啟協議管理器
             -> go txBroadCastLoop
             -> go minedBroadCastLoop
             -> go txsyncLoop
             -> go syncer 
       -> server.Start  // 啟動服務
          -> ListenUDP  // 監聽UDP埠,以太坊節點之間通訊使用的是UDP協議
          -> newUDP    // 新建UDP
          -> newTable  // 新建路由表 

 -> utils.RegisterEthService // 註冊以太坊服務
    -> eth.New      // 新建以太坊物件
      -> core.SetupGenesisBlock
      -> core.NewBlockChian
      -> core.NewTxPool
      -> protocol.Manger

之前分析以太坊啟動流程的時候提到,啟動節點後有兩條路線,一個是註冊並啟動以太坊服務,另一個就是啟動p2p server。Start()函式主要做了三件事:

a. 生成路由表於建立底層網路;
b. 生成DialState用於驅動維護本地peer的更新與死亡;
c. 監聽本地介面用於資訊應答。

現在來看看server.Start()的原始碼:

1)首先要設定Server的基本靜態屬性

func (srv *Server) Start() (err error) {
	srv.lock.Lock()
	defer srv.lock.Unlock()
	if srv.running {
		return errors.New("server already running")
	}
	srv.running = true
	srv.log = srv.Config.Logger
	if srv.log == nil {
		srv.log = log.New()
	}
	srv.log.Info("Starting P2P networking")

	// 配置p2p服務
	if srv.PrivateKey == nil {
		return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
	}
	if srv.newTransport == nil {
		srv.newTransport = newRLPX
	}
	if srv.Dialer == nil {
		srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
	}
	srv.quit = make(chan struct{})
	srv.addpeer = make(chan *conn)
	srv.delpeer = make(chan peerDrop)
	srv.posthandshake = make(chan *conn)
	srv.addstatic = make(chan *discover.Node)
	srv.removestatic = make(chan *discover.Node)
	srv.addtrusted = make(chan *discover.Node)
	srv.removetrusted = make(chan *discover.Node)
	srv.peerOp = make(chan peerOpFunc)
	srv.peerOpDone = make(chan struct{})

2)task1:配置Server的網路,生成路由表

	var (
		conn      *net.UDPConn  
		sconn     *sharedUDPConn 
		realaddr  *net.UDPAddr 
		unhandled chan discover.ReadPacket
	)

其中,sharedUDPConn實現一個共享連線。Write將訊息傳送到基礎連線,而read將返回發現不可處理的訊息,並由主偵聽器傳送到未處理的通道。

        // 解析本地地址,開啟UDP埠
	if !srv.NoDiscovery || srv.DiscoveryV5 {
		addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
		if err != nil {
			return err
		}
		conn, err = net.ListenUDP("udp", addr)
		if err != nil {
			return err
		}
                // 將UDP的埠註冊(map)到NAT網路,使內網程式獲得真實的外網IP地址
		realaddr = conn.LocalAddr().(*net.UDPAddr)
		if srv.NAT != nil {
			if !realaddr.IP.IsLoopback() {
				go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
			}
			// TODO: react to external IP changes over time.
			if ext, err := srv.NAT.ExternalIP(); err == nil {
				realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
			}
		}
	}

        // 設定unhandled通道以及共享的UDP網路
	if !srv.NoDiscovery && srv.DiscoveryV5 {
		unhandled = make(chan discover.ReadPacket, 100)
		sconn = &sharedUDPConn{conn, unhandled}
	}

	// 配置一個discoverTable介面,用以配置並生成節點路由表table
	if !srv.NoDiscovery {
		cfg := discover.Config{
			PrivateKey:   srv.PrivateKey,
			AnnounceAddr: realaddr,
			NodeDBPath:   srv.NodeDatabase,
			NetRestrict:  srv.NetRestrict,
			Bootnodes:    srv.BootstrapNodes,
			Unhandled:    unhandled,
		}
		ntab, err := discover.ListenUDP(conn, cfg)
		if err != nil {
			return err
		}
		srv.ntab = ntab
	}

        // 配置一個DiscoveryV5網路協議,生成節點路由表table
	if srv.DiscoveryV5 {
		var (
			ntab *discv5.Network
			err  error
		)
		if sconn != nil {
			ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
		} else {
			ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
		}
		if err != nil {
			return err
		}
		if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
			return err
		}
		srv.DiscV5 = ntab
	}

discover.ListenUDP和discv5.ListenUDP均返回一個新表,偵聽laddr上的UDP資料包。discV5還是一個實驗性質的網路協議,我們先重點看普通的discover.ListenUDP。

func ListenUDP(c conn, cfg Config) (*Table, error) {
	tab, _, err := newUDP(c, cfg)
	if err != nil {
		return nil, err
	}
	log.Info("UDP listener up", "self", tab.self)
	return tab, nil
}

newUDP(c,cfg)函式配置了udp物件,呼叫newTable()生成了路由表,並將其返回,同時開啟了兩個協程,分別是開啟等待Pending reply協程和網路資料獲取協程(處理收到的UDP packet)。

func newUDP(c conn, cfg Config) (*Table, *udp, error) {
	udp := &udp{
		conn:        c,
		priv:        cfg.PrivateKey,
		netrestrict: cfg.NetRestrict,
		closing:     make(chan struct{}),
		gotreply:    make(chan reply),
		addpending:  make(chan *pending),
	}
	realaddr := c.LocalAddr().(*net.UDPAddr)
	if cfg.AnnounceAddr != nil {
		realaddr = cfg.AnnounceAddr
	}
	// TODO: separate TCP port
	udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
	tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
	if err != nil {
		return nil, nil, err
	}
	udp.Table = tab

	go udp.loop()
	go udp.readLoop(cfg.Unhandled)
	return udp.Table, udp, nil
}

3)task2:生成DialState用於驅動維護本地peer的更新與死亡

	dynPeers := srv.maxDialedConns()
        //newDialState 物件生成,這個物件包含Peer的實際維護程式碼
	dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)

        ...

        //重要的一句,開個協程,在其中做peer的維護
	go srv.run(dialer)
	srv.running = true
	return nil
}

4)task3:啟動TCP埠,監聽本地介面用於資訊應答

	// handshake  協議載入
	srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
	for _, p := range srv.Protocols {
		srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
	}
	// listen/dial
        //監聽本地埠
	if srv.ListenAddr != "" {
		if err := srv.startListening(); err != nil {
			return err
		}
	}
	if srv.NoDial && srv.ListenAddr == "" {
		srv.log.Warn("P2P server will be useless, neither dialing nor listening")
	}

三、server.Run()

Server.Start()中通過Server.startListening()啟動一個單獨執行緒(listenLoop())去監聽某個埠有無主動發來的IP連線;另外再啟動一個單獨執行緒呼叫run()函式,在無限迴圈裡處理接收到的任何新訊息新物件。在run()函式中,如果有遠端peer發來連線請求(新的p2p.conn{}),則呼叫Server.newPeer()生成新的peer物件,並把Server.Protocols全交給peer。

Server.run()函式接受的是一個dialstate物件,它控制著dial事務和discover lookup事務。

type dialstate struct {
	maxDynDials int
	ntab        discoverTable // 該介面包含Lookup()方法
	netrestrict *netutil.Netlist

	lookupRunning bool
	dialing       map[discover.NodeID]connFlag
	lookupBuf     []*discover.Node // 當前lookup的結果
	randomNodes   []*discover.Node // 從table中隨機選出的Node
	static        map[discover.NodeID]*dialTask
	hist          *dialHistory

	start     time.Time        // time when the dialer was first used
	bootnodes []*discover.Node // 當沒有peers的時候預設dial的節點
}

下面我們看看Server.run(dialer)執行的過程。首先,函式啟動後,在記憶體中定義下面幾個變數:

	var (
		peers        = make(map[discover.NodeID]*Peer)
		inboundCount = 0
		trusted      = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
		taskdone     = make(chan task, maxActiveDialTasks)
		runningTasks []task
		queuedTasks  []task // tasks that can't run yet
	)

peers:所有建立了連線的peer;
trusted:信任的節點,因為dial其他節點的時候都要進行驗證,對信任的節點可以加速驗證過程;
runningTasks和queuedTasks分別是正在執行的任務和待執行的任務。

task是什麼?

在dial.go中定義了三類task:dialTask,discoverTask和waitExpireTask,以及一個task介面:

type task interface {
	Do(*Server)
}
// 每向一個節點發起連線就會生成一個dialTask
type dialTask struct {
	flags        connFlag
	dest         *discover.Node
	lastResolved time.Time
	resolveDelay time.Duration
}
// 有時候動態連線的節點不足時,必須要到table中查詢新的節點
type discoverTask struct {
	results []*discover.Node
}
// waitExpireTask用來保證當沒有其他任務時Server.run()的持續執行
type waitExpireTask struct {
	time.Duration
}

至於這三種任務何時生成,則屬於newTasks()的控制範圍。

然後,Server.run()中定義了對tasks的排程函式,用來管理上述的兩個[]task,同時對task進行執行:task.Do()。

	// removes t from runningTasks
	delTask := func(t task) {
		for i := range runningTasks {
			if runningTasks[i] == t {
				runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
				break
			}
		}
	}
	// starts until max number of active tasks is satisfied
	startTasks := func(ts []task) (rest []task) {
		i := 0
		for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
			t := ts[i]
			srv.log.Trace("New dial task", "task", t)
			go func() { t.Do(srv); taskdone <- t }()
			runningTasks = append(runningTasks, t)
		}
		return ts[i:]
	}
	scheduleTasks := func() {
		// 先從佇列中開始執行task
		queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
		// 儘可能多的啟動task,所以會新建task進行執行
		if len(runningTasks) < maxActiveDialTasks {
			nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
			queuedTasks = append(queuedTasks, startTasks(nt)...)
		}
	}

當這些都準備完後,Server.run()的主體就要正式執行了,一個無限執行的for迴圈,監聽Server各通道的傳值,然後執行相應的任務:

	for {
		scheduleTasks()

		select {
		case <-srv.quit: break running
		case n := <-srv.addstatic:
			srv.log.Trace("Adding static node", "node", n)
			dialstate.addStatic(n)
		case n := <-srv.removestatic:
			srv.log.Trace("Removing static node", "node", n)
			dialstate.removeStatic(n)
			if p, ok := peers[n.ID]; ok {
				p.Disconnect(DiscRequested)
			}
		case n := <-srv.addtrusted:
			srv.log.Trace("Adding trusted node", "node", n)
			trusted[n.ID] = true
			if p, ok := peers[n.ID]; ok {
				p.rw.set(trustedConn, true)
			}
		case n := <-srv.removetrusted:
			srv.log.Trace("Removing trusted node", "node", n)
			if _, ok := trusted[n.ID]; ok {
				delete(trusted, n.ID)
			}
			if p, ok := peers[n.ID]; ok {
				p.rw.set(trustedConn, false)
			}
		case op := <-srv.peerOp:
			op(peers)
			srv.peerOpDone <- struct{}{}
		case t := <-taskdone:
			srv.log.Trace("Dial task done", "task", t)
			dialstate.taskDone(t, time.Now())
			delTask(t)
		case c := <-srv.posthandshake:
			if trusted[c.id] {
				c.flags |= trustedConn
			}
			select {
			case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
			case <-srv.quit:
				break running
			}
		case c := <-srv.addpeer:
			err := srv.protoHandshakeChecks(peers, inboundCount, c)
			if err == nil {
				p := newPeer(c, srv.Protocols)
				if srv.EnableMsgEvents {
					p.events = &srv.peerFeed
				}
				name := truncateName(c.name)
				srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
				go srv.runPeer(p)
				peers[c.id] = p
				if p.Inbound() {
					inboundCount++
				}
			}
			select {
			case c.cont <- err:
			case <-srv.quit:
				break running
			}
		case pd := <-srv.delpeer:
			d := common.PrettyDuration(mclock.Now() - pd.created)
			pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
			delete(peers, pd.ID())
			if pd.Inbound() {
				inboundCount--
			}
		}
	}

每次迴圈都先執行scheduleTasks(),然後通過StartTasks()開啟一個協程執行task.Do(),然後下面監聽各通道的傳值。多數情況下,task就是dialTask,同一時間內discoverTask只能存在一個。task.Do()所做的事情是呼叫dial()方法,向task.dest中的節點發起連線,後者的執行路徑是:

task.dial()
    ——> Server.SetupConn()
        ——> Server.checkpoint(c, srv.posthandshake)
            ——>Server.checkpoint(c, srv.addpeer)

按照for迴圈,Server的addpeer通道傳出conn物件的時候,就會執行newPeer(),然後啟動一個協程執行peer.Run(),從而實現兩個節點之間的連線。

整個過程如下圖所示: