以太坊原始碼解讀(9)以太坊的P2P模組解析——底層網路構建和啟動
以太坊的底層p2pServer,大約可以分為三層:
1、底層:table物件、node物件,它們分別定義了底層的路由表以及本地節點的資料結構、搜尋和驗證;
1)database.go //封裝node資料庫相關操作
2)node.go //節點資料結構
3)ntp.go //同步時間
4)table.go //路由表
5)udp.go //網路相關操作2、中層:peer物件定義了遠端節點、message物件開放傳送介面、server物件則提供peer節點的檢測、初始化、事件訂閱、狀態查詢、啟動和停止等功能;
1)dial.go //封裝一個任務生成處理結構以及三種任務結構中(此處命名不太精確)
2)message.go //定義一些資料的讀寫介面,以及對外的Send/SendItem函式
3)peer.go //封裝了Peer 包括訊息讀取
4)rlpx.go //內部的握手協議
5)server.go //初始化,維護Peer網路,還有一些對外的介面3、頂層:在eth/peer.go中對p2p/peer.go的peer再封裝,包含了對該節點廣播的更多區塊鏈的資訊,如交易、交易hash、區塊以及區塊頭hash等。peer最終會被收集在peerset中使用。
1)eth/peer.go // 封裝了peer和peerset兩個結構體以及一些廣播資料的方法
2)eth/handler.go // 封裝了很多協議管理工具
一、p2p.Server基本結構
Server用於管理所有的peer連線:
type Server struct { // Config包含了所有Server的配置選項,Service第一啟動的時候Config未必初始化 Config // 用於測試的hooks newTransport func(net.Conn) transport newPeerHook func(*Peer) lock sync.Mutex // protects running running bool ntab discoverTable // 包括Self()、Close()、Resolve()、Lookup()、ReadRandomNodes()等函式的介面 listener net.Listener ourHandshake *protoHandshake // 協議握手的RLP結構 lastLookup time.Time DiscV5 *discv5.Network // 基於V5發現協議的topic-discovery網路 // 下面是關於peer的操作 peerOp chan peerOpFunc peerOpDone chan struct{} quit chan struct{} addstatic chan *discover.Node removestatic chan *discover.Node addtrusted chan *discover.Node removetrusted chan *discover.Node posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop peerFeed event.Feed log log.Logger }
再看看Server的配置:
type Config struct { PrivateKey *ecdsa.PrivateKey `toml:"-"` // 本地節點的祕鑰 MaxPeers int // 可連線的節點最大數值 // maxpendingpeer是在握手階段中可以掛起的最大對等點數量,分別計算入站連線和出站連線。預設值的預設值為零 MaxPendingPeers int `toml:",omitempty"` // 撥號比率控制入站與撥號連線的比率。例如:撥號比率為2允許1/2的連線被撥號。設定撥號比率為零預設為3。 DialRatio int `toml:",omitempty"` NoDiscovery bool // NoDiscovery 用來禁用節點發現機制,常用於協議debugging DiscoveryV5 bool `toml:",omitempty"` // DiscoveryV5 決定了是否啟用topic-discovery協議 Name string `toml:"-"` // 設定節點名稱 BootstrapNodes []*discover.Node // BootstrapNodes 用於建立與網路其餘部分的連線。 BootstrapNodesV5 []*discv5.Node `toml:",omitempty"` // BootstrapNodesV5 用於建立與網路其餘部分的V5連線。 StaticNodes []*discover.Node // 靜態節點用作預先配置的連線,在斷開連線時總是維護和重新連線。 TrustedNodes []*discover.Node // 可信節點被用作預先配置的連線,這些連線總是允許連線的,甚至超過對等限制。 // 連線可以被限制到某些IP網路。如果將此選項設定為非nil值,則只考慮與列表中包含的IP網路之一匹配的主機。 NetRestrict *netutil.Netlist `toml:",omitempty"` // NodeDatabase是到資料庫的路徑,其中包含以前在網路中看到的活動節點 NodeDatabase string `toml:",omitempty"` // 協議應該包含伺服器支援的協議。為每個對等點啟動匹配協議。 Protocols []Protocol `toml:"-"` // 如果ListenAddr設定為非nil地址,伺服器將偵聽傳入的連線。 // 如果埠為零,作業系統將選擇一個埠。當伺服器啟動時,ListenAddr欄位將更新實際地址。 ListenAddr string // 如果設定為非nil值,則使用給定的NAT埠對映器使偵聽埠對Internet可用。 NAT nat.Interface `toml:",omitempty"` // 如果撥號器設定為非空值,則使用給定的撥號器撥號出站peer連線。 Dialer NodeDialer `toml:"-"` // 如果NoDial是真的,伺服器將不會撥任何節點。 NoDial bool `toml:",omitempty"` // 如果EnableMsgEvents被設定,那麼當訊息傳送到peer或從peer接收到時,伺服器將發出PeerEvents EnableMsgEvents bool // 日誌記錄器是一個自定義日誌記錄器 Logger log.Logger `toml:",omitempty"` }
配置裡包括了服務可連線的節點(靜態節點、信任節點、剩餘節點)、節點發現機制、協議列表(包括ethereum協議)、埠控制。
二、p2p.Server的啟動:sever.Start()
-> geth
-> startNode // 首先要啟動節點
-> utils.StartNode
-> Node.Start
-> eth.Start // 啟動以太坊物件
-> protocolManager.start() // 開啟協議管理器
-> go txBroadCastLoop
-> go minedBroadCastLoop
-> go txsyncLoop
-> go syncer
-> server.Start // 啟動服務
-> ListenUDP // 監聽UDP埠,以太坊節點之間通訊使用的是UDP協議
-> newUDP // 新建UDP
-> newTable // 新建路由表
-> utils.RegisterEthService // 註冊以太坊服務
-> eth.New // 新建以太坊物件
-> core.SetupGenesisBlock
-> core.NewBlockChian
-> core.NewTxPool
-> protocol.Manger
之前分析以太坊啟動流程的時候提到,啟動節點後有兩條路線,一個是註冊並啟動以太坊服務,另一個就是啟動p2p server。Start()函式主要做了三件事:
a. 生成路由表於建立底層網路;
b. 生成DialState用於驅動維護本地peer的更新與死亡;
c. 監聽本地介面用於資訊應答。
現在來看看server.Start()的原始碼:
1)首先要設定Server的基本靜態屬性
func (srv *Server) Start() (err error) {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.New()
}
srv.log.Info("Starting P2P networking")
// 配置p2p服務
if srv.PrivateKey == nil {
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node)
srv.addtrusted = make(chan *discover.Node)
srv.removetrusted = make(chan *discover.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
2)task1:配置Server的網路,生成路由表
var (
conn *net.UDPConn
sconn *sharedUDPConn
realaddr *net.UDPAddr
unhandled chan discover.ReadPacket
)
其中,sharedUDPConn實現一個共享連線。Write將訊息傳送到基礎連線,而read將返回發現不可處理的訊息,並由主偵聽器傳送到未處理的通道。
// 解析本地地址,開啟UDP埠
if !srv.NoDiscovery || srv.DiscoveryV5 {
addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
if err != nil {
return err
}
conn, err = net.ListenUDP("udp", addr)
if err != nil {
return err
}
// 將UDP的埠註冊(map)到NAT網路,使內網程式獲得真實的外網IP地址
realaddr = conn.LocalAddr().(*net.UDPAddr)
if srv.NAT != nil {
if !realaddr.IP.IsLoopback() {
go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
}
// TODO: react to external IP changes over time.
if ext, err := srv.NAT.ExternalIP(); err == nil {
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}
}
// 設定unhandled通道以及共享的UDP網路
if !srv.NoDiscovery && srv.DiscoveryV5 {
unhandled = make(chan discover.ReadPacket, 100)
sconn = &sharedUDPConn{conn, unhandled}
}
// 配置一個discoverTable介面,用以配置並生成節點路由表table
if !srv.NoDiscovery {
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
AnnounceAddr: realaddr,
NodeDBPath: srv.NodeDatabase,
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
}
ntab, err := discover.ListenUDP(conn, cfg)
if err != nil {
return err
}
srv.ntab = ntab
}
// 配置一個DiscoveryV5網路協議,生成節點路由表table
if srv.DiscoveryV5 {
var (
ntab *discv5.Network
err error
)
if sconn != nil {
ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
} else {
ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
}
if err != nil {
return err
}
if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
return err
}
srv.DiscV5 = ntab
}
discover.ListenUDP和discv5.ListenUDP均返回一個新表,偵聽laddr上的UDP資料包。discV5還是一個實驗性質的網路協議,我們先重點看普通的discover.ListenUDP。
func ListenUDP(c conn, cfg Config) (*Table, error) {
tab, _, err := newUDP(c, cfg)
if err != nil {
return nil, err
}
log.Info("UDP listener up", "self", tab.self)
return tab, nil
}
newUDP(c,cfg)函式配置了udp物件,呼叫newTable()生成了路由表,並將其返回,同時開啟了兩個協程,分別是開啟等待Pending reply協程和網路資料獲取協程(處理收到的UDP packet)。
func newUDP(c conn, cfg Config) (*Table, *udp, error) {
udp := &udp{
conn: c,
priv: cfg.PrivateKey,
netrestrict: cfg.NetRestrict,
closing: make(chan struct{}),
gotreply: make(chan reply),
addpending: make(chan *pending),
}
realaddr := c.LocalAddr().(*net.UDPAddr)
if cfg.AnnounceAddr != nil {
realaddr = cfg.AnnounceAddr
}
// TODO: separate TCP port
udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
if err != nil {
return nil, nil, err
}
udp.Table = tab
go udp.loop()
go udp.readLoop(cfg.Unhandled)
return udp.Table, udp, nil
}
3)task2:生成DialState用於驅動維護本地peer的更新與死亡
dynPeers := srv.maxDialedConns()
//newDialState 物件生成,這個物件包含Peer的實際維護程式碼
dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
...
//重要的一句,開個協程,在其中做peer的維護
go srv.run(dialer)
srv.running = true
return nil
}
4)task3:啟動TCP埠,監聽本地介面用於資訊應答
// handshake 協議載入
srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
for _, p := range srv.Protocols {
srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
}
// listen/dial
//監聽本地埠
if srv.ListenAddr != "" {
if err := srv.startListening(); err != nil {
return err
}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
三、server.Run()
Server.Start()中通過Server.startListening()啟動一個單獨執行緒(listenLoop())去監聽某個埠有無主動發來的IP連線;另外再啟動一個單獨執行緒呼叫run()函式,在無限迴圈裡處理接收到的任何新訊息新物件。在run()函式中,如果有遠端peer發來連線請求(新的p2p.conn{}),則呼叫Server.newPeer()生成新的peer物件,並把Server.Protocols全交給peer。
Server.run()函式接受的是一個dialstate物件,它控制著dial事務和discover lookup事務。
type dialstate struct {
maxDynDials int
ntab discoverTable // 該介面包含Lookup()方法
netrestrict *netutil.Netlist
lookupRunning bool
dialing map[discover.NodeID]connFlag
lookupBuf []*discover.Node // 當前lookup的結果
randomNodes []*discover.Node // 從table中隨機選出的Node
static map[discover.NodeID]*dialTask
hist *dialHistory
start time.Time // time when the dialer was first used
bootnodes []*discover.Node // 當沒有peers的時候預設dial的節點
}
下面我們看看Server.run(dialer)執行的過程。首先,函式啟動後,在記憶體中定義下面幾個變數:
var (
peers = make(map[discover.NodeID]*Peer)
inboundCount = 0
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
taskdone = make(chan task, maxActiveDialTasks)
runningTasks []task
queuedTasks []task // tasks that can't run yet
)
peers:所有建立了連線的peer;
trusted:信任的節點,因為dial其他節點的時候都要進行驗證,對信任的節點可以加速驗證過程;
runningTasks和queuedTasks分別是正在執行的任務和待執行的任務。
task是什麼?
在dial.go中定義了三類task:dialTask,discoverTask和waitExpireTask,以及一個task介面:
type task interface { Do(*Server) } // 每向一個節點發起連線就會生成一個dialTask type dialTask struct { flags connFlag dest *discover.Node lastResolved time.Time resolveDelay time.Duration } // 有時候動態連線的節點不足時,必須要到table中查詢新的節點 type discoverTask struct { results []*discover.Node } // waitExpireTask用來保證當沒有其他任務時Server.run()的持續執行 type waitExpireTask struct { time.Duration }
至於這三種任務何時生成,則屬於newTasks()的控制範圍。
然後,Server.run()中定義了對tasks的排程函式,用來管理上述的兩個[]task,同時對task進行執行:task.Do()。
// removes t from runningTasks
delTask := func(t task) {
for i := range runningTasks {
if runningTasks[i] == t {
runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
break
}
}
}
// starts until max number of active tasks is satisfied
startTasks := func(ts []task) (rest []task) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
return ts[i:]
}
scheduleTasks := func() {
// 先從佇列中開始執行task
queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
// 儘可能多的啟動task,所以會新建task進行執行
if len(runningTasks) < maxActiveDialTasks {
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
queuedTasks = append(queuedTasks, startTasks(nt)...)
}
}
當這些都準備完後,Server.run()的主體就要正式執行了,一個無限執行的for迴圈,監聽Server各通道的傳值,然後執行相應的任務:
for {
scheduleTasks()
select {
case <-srv.quit: break running
case n := <-srv.addstatic:
srv.log.Trace("Adding static node", "node", n)
dialstate.addStatic(n)
case n := <-srv.removestatic:
srv.log.Trace("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
}
case n := <-srv.addtrusted:
srv.log.Trace("Adding trusted node", "node", n)
trusted[n.ID] = true
if p, ok := peers[n.ID]; ok {
p.rw.set(trustedConn, true)
}
case n := <-srv.removetrusted:
srv.log.Trace("Removing trusted node", "node", n)
if _, ok := trusted[n.ID]; ok {
delete(trusted, n.ID)
}
if p, ok := peers[n.ID]; ok {
p.rw.set(trustedConn, false)
}
case op := <-srv.peerOp:
op(peers)
srv.peerOpDone <- struct{}{}
case t := <-taskdone:
srv.log.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
if trusted[c.id] {
c.flags |= trustedConn
}
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
case <-srv.quit:
break running
}
case c := <-srv.addpeer:
err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {
p := newPeer(c, srv.Protocols)
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
go srv.runPeer(p)
peers[c.id] = p
if p.Inbound() {
inboundCount++
}
}
select {
case c.cont <- err:
case <-srv.quit:
break running
}
case pd := <-srv.delpeer:
d := common.PrettyDuration(mclock.Now() - pd.created)
pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
delete(peers, pd.ID())
if pd.Inbound() {
inboundCount--
}
}
}
每次迴圈都先執行scheduleTasks(),然後通過StartTasks()開啟一個協程執行task.Do(),然後下面監聽各通道的傳值。多數情況下,task就是dialTask,同一時間內discoverTask只能存在一個。task.Do()所做的事情是呼叫dial()方法,向task.dest中的節點發起連線,後者的執行路徑是:
task.dial()
——> Server.SetupConn()
——> Server.checkpoint(c, srv.posthandshake)
——>Server.checkpoint(c, srv.addpeer)
按照for迴圈,Server的addpeer通道傳出conn物件的時候,就會執行newPeer(),然後啟動一個協程執行peer.Run(),從而實現兩個節點之間的連線。
整個過程如下圖所示: