1. 程式人生 > >以太坊原始碼解讀(3)以太坊啟動流程簡析

以太坊原始碼解讀(3)以太坊啟動流程簡析

啟動命令:

geth --identity "TestNode1" --datadir "data0" --rpc --rpcapi "db,eth,net,web3" --port "30303" --networkid "29382" --ws --wsorigins="*" --rpccorsdomain="*" console

啟動後,我們可以從日誌來分析程式啟動的流程。

INFO [10-29|12:27:11.737] Maximum peer count
INFO [10-29|12:27:11.747] Starting peer-to-peer node  // node/node.go
INFO [10-29|12:27:11.747] Allocated cache and file handles  // 
INFO [10-29|12:27:11.763] Initialised chain configuration   // eth/backend.go
INFO [10-29|12:27:11.763] Disk storage enabled for ethash caches // consensus/ethash/ethash.go
INFO [10-29|12:27:11.763] Disk storage enabled for ethash DAGs 
INFO [10-29|12:27:11.763] Initialising Ethereum protocol
WARN [10-29|12:27:11.765] Head state missing, repairing chain 
INFO [10-29|12:27:11.775] Rewound blockchain to past state
INFO [10-29|12:27:11.776] Loaded most recent local header
INFO [10-29|12:27:11.776] Loaded most recent local full block
INFO [10-29|12:27:11.776] Loaded most recent local fast block
INFO [10-29|12:27:11.776] Loaded local transaction journal
INFO [10-29|12:27:11.777] Regenerated local transaction journal
WARN [10-29|12:27:11.777] Blockchain not empty, fast sync disabled
INFO [10-29|12:27:11.777] Starting P2P networking
INFO [10-29|12:27:13.928] Mapped network port 
INFO [10-29|12:27:13.990] UDP listener up                         
INFO [10-29|12:27:13.991] RLPx listener up                        
INFO [10-29|12:27:13.995] IPC endpoint opened              
INFO [10-29|12:27:13.995] HTTP endpoint opened 
INFO [10-29|12:27:13.996] WebSocket endpoint opened     
INFO [10-29|12:27:14.001] Mapped network port    
Welcome to the Geth JavaScript console!

啟動流程圖

一、啟動的main函式 cmd/geth/main.go

Go裡面有兩個保留的函式:init函式(能夠應用於所有的package)和main函式(只能應用於package main)。這兩個函式在定義時不能有任何的引數和返回值。

在cmd/geth/main.go中,首先定義app:

cmd/geth/main.go

var (
    // Git SHA1 commit hash of the release (set via linker flags)
    gitCommit = ""
    // The app that holds all commands and flags.
    app = utils.NewApp(gitCommit, "the go-ethereum command line interface")
    // flags
    nodeFlags = []cli.Flag{
	utils.IdentityFlag,    // 所有這些flag都來自cmd/utils模組中
	...
    }
    rpcFlags = []cli.Flag{...}
    consoleFlags = []cli.Flag{...}  // 來自cmd/geth/consolecmd.go
    whisperFlags = []cli.Flag{...}
    metricsFlafs = []cli.Flag{...}
)

然後通過init()函式來初始化app,其中app.Action表示如果使用者沒有輸入其他的子命令的情況下,會呼叫這個欄位指向的函式,即geth()。

geth的命令使用了urfave/cli這個庫,這個庫是go語言命令列程式常用的庫,它把命令列解析的過程做了一下封裝,抽象出flag/command/subcommand這些模組,使用者只需要提供一些模組的配置,引數的解析和關聯在庫內部完成,幫助資訊也可以自動生成。

app.Flags和app.Commands分別設定了支援的[option]和[command],是當從使用者輸入命令解析出相應的引數後指向特定的函式並執行。這裡先不做介紹,後面以console的啟動為例介紹這一部分的原理。

func init() {
	// Initialize the CLI app and start Geth
	app.Action = geth
	app.HideVersion = true // we have a command to print the version
	app.Copyright = "Copyright 2013-2018 The go-ethereum Authors"
        // 所有能夠支援的子命令
	app.Commands = []cli.Command{
		// See chaincmd.go:
		initCommand,
                ...
		// See monitorcmd.go:
		// See accountcmd.go:
		// See consolecmd.go:
		// See misccmd.go:
		// See config.go
	}
	sort.Sort(cli.CommandsByName(app.Commands))
    
        // 所有能夠解析的Options
	app.Flags = append(app.Flags, nodeFlags...)
	app.Flags = append(app.Flags, rpcFlags...)
	app.Flags = append(app.Flags, consoleFlags...)
	app.Flags = append(app.Flags, debug.Flags...)
	app.Flags = append(app.Flags, whisperFlags...)
	app.Flags = append(app.Flags, metricsFlags...)

	app.Before = func(ctx *cli.Context) error {
           ...
	}

	app.After = func(ctx *cli.Context) error {
		debug.Exit()
		console.Stdin.Close() // Resets terminal mode.
		return nil
	}
}

通過上面的程式碼就把我們解析使用者命令的物件設定完成了,下一步就是執行app.Run()。


func main() {
    if err := app.Run(os.Args); err != nil {
        fmt.Fprintln(os.Stderr, err)
        os.Exit(1)
    }
}

在以太坊客戶端geth中,如果什麼命令都不輸入直接執行geth, 就會預設啟動一個全節點模式的節點,連線到主網路。這時候就是按照上面所說的,啟動了geth()函式:

cmd/geth/main.go

func geth(ctx *cli.Context) error {
    if args := ctx.Args(); len(args) > 0 {
	return fmt.Errorf("invalid command: %q", args[0])
    }
    node := makeFullNode(ctx)  // 定義全節點物件
    startNode(ctx, node)       // 啟動全節點
    node.Wait()
    return nil
}

二、全節點配置

在cmd/geth/main.go中有一個startNode()函式用來啟動全節點,首先呼叫cmd/geth/config.go中的makeFullNode()函式:

func makeFullNode(ctx *cli.Context) *node.Node {
	stack, cfg := makeConfigNode(ctx) // 進行節點配置

	utils.RegisterEthService(stack, &cfg.Eth)  // 註冊eth服務

	if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
		utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
	}
	// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
	shhEnabled := enableWhisper(ctx)
	shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
	if shhEnabled || shhAutoEnabled {
		if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
			cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
		}
		if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
			cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
		}
		utils.RegisterShhService(stack, &cfg.Shh)
	}

	// Add the Ethereum Stats daemon if requested.
	if cfg.Ethstats.URL != "" {
		utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
	}
	return stack
}

可以看出,makeFullNode首先通過makeConfigNode(ctx) 對節點進行配置包括Eth、Shh、Node、Dashboard,返回Node和geth配置,然後開啟兩條路線:1、通過Node.Start()——>Server.Start()啟動p2p服務;2、通過RegisterEthService將Ethereum服務註冊到Node的services map[reflect.Type]Service中,通過Node.Start()來啟動Ethereum服務。

func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
	// Load defaults.
	cfg := gethConfig{
		Eth:       eth.DefaultConfig,
		Shh:       whisper.DefaultConfig,
		Node:      defaultNodeConfig(),
		Dashboard: dashboard.DefaultConfig,
	}

	// Load config file.
	if file := ctx.GlobalString(configFileFlag.Name); file != "" {
		if err := loadConfig(file, &cfg); err != nil {
			utils.Fatalf("%v", err)
		}
	}

	// Apply flags.
	utils.SetNodeConfig(ctx, &cfg.Node)
	stack, err := node.New(&cfg.Node)
	if err != nil {
		utils.Fatalf("Failed to create the protocol stack: %v", err)
	}
	utils.SetEthConfig(ctx, stack, &cfg.Eth)
	if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) {
		cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
	}

	utils.SetShhConfig(ctx, stack, &cfg.Shh)
	utils.SetDashboardConfig(ctx, &cfg.Dashboard)

	return stack, cfg
}

三、註冊ETH服務

makeFullNode()函式裡在做好節點配置之後,再呼叫cmd/utils/flag.go裡RegisterEthService()函式註冊eth服務,而這個函式是呼叫了node/node.go模組中的Register()方法 ,這個方法的引數是一個建構函式(constructor),正如下面的程式碼所示:

cmd/utils/flag.go

func RegisterEthService(stack *node.Node, cfg *eth.Config) {
	var err error
	if cfg.SyncMode == downloader.LightSync {
		err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
			return les.New(ctx, cfg)
		})
	} else {
		err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
			fullNode, err := eth.New(ctx, cfg)
			if fullNode != nil && cfg.LightServ > 0 {
				ls, _ := les.NewLesServer(fullNode, cfg)
				fullNode.AddLesServer(ls)
			}
			return fullNode, err
		})
	}
	if err != nil {
		Fatalf("Failed to register the Ethereum service: %v", err)
	}
}

這個函式裡會判斷同步的方式 ,如果是LightSync則會使用les.New()建立輕節點,否則就使用eth.New()建立全節點,這裡我們還是建立全節點,即呼叫eth.New()方法。

eth/backend.go

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
    ...
    chainDb, err := CreateDB(ctx, config, "chaindata")
    ...
    chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
    if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
	return nil, genesisErr
    }
    log.Info("Initialised chain configuration", "config", chainConfig)

    eth := &Ethereum{
		config:         config,
		chainDb:        chainDb,
		chainConfig:    chainConfig,
		eventMux:       ctx.EventMux,
		accountManager: ctx.AccountManager,
		engine:         CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb),
		shutdownChan:   make(chan bool),
		networkID:      config.NetworkId,
		gasPrice:       config.GasPrice,
		etherbase:      config.Etherbase,
		bloomRequests:  make(chan chan *bloombits.Retrieval),
		bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms),
    }

    log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
    ...
    var (
	vmConfig    = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
	cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
    )
    eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
    if err != nil {
	return nil, err
    }
    // Rewind the chain in case of an incompatible config upgrade.
    if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
	log.Warn("Rewinding chain to upgrade configuration", "err", compat)
	eth.blockchain.SetHead(compat.RewindTo)
	rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
    }
    eth.bloomIndexer.Start(eth.blockchain)

    if config.TxPool.Journal != "" {
	config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
    }
    eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)

    if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
	return nil, err
    }

    eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
    eth.miner.SetExtra(makeExtraData(config.ExtraData))

    eth.APIBackend = &EthAPIBackend{eth, nil}
    gpoParams := config.GPO
    if gpoParams.Default == nil {
	gpoParams.Default = config.GasPrice
    }
    eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)

    return eth, nil
}

總體上看來,這個函式主要是執行了以下幾步:
1、載入創始區塊
2、初始化以太坊類
3、初始化BlookChain
4、初始化交易池
5、初始化協議管理器
6、初始化挖礦模式

四、啟動P2P網路

回到cmd/geth/main.go,在節點配置完成後執行startNode()函式。startNode()的主要功能有:

1、啟動node;
2、解鎖賬戶;
3、開啟錢包事件監聽;

func startNode(ctx *cli.Context, stack *node.Node) {
	debug.Memsize.Add("node", stack)

	// Start up the node itself
	utils.StartNode(stack)

	// Unlock any account specifically requested
	ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)

	passwords := utils.MakePasswordList(ctx)
	unlocks := strings.Split(ctx.GlobalString(utils.UnlockedAccountFlag.Name), ",")
	for i, account := range unlocks {
		if trimmed := strings.TrimSpace(account); trimmed != "" {
			unlockAccount(ctx, ks, trimmed, i, passwords)
		}
	}
	// Register wallet event handlers to open and auto-derive wallets
	events := make(chan accounts.WalletEvent, 16)
	stack.AccountManager().Subscribe(events)

	go func() {
		// Create a chain state reader for self-derivation
		rpcClient, err := stack.Attach()
		if err != nil {
			utils.Fatalf("Failed to attach to self: %v", err)
		}
		stateReader := ethclient.NewClient(rpcClient)

		// Open any wallets already attached
		for _, wallet := range stack.AccountManager().Wallets() {
			if err := wallet.Open(""); err != nil {
				log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err)
			}
		}
		// Listen for wallet event till termination
		for event := range events {
			switch event.Kind {
			case accounts.WalletArrived:
				if err := event.Wallet.Open(""); err != nil {
					log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err)
				}
			case accounts.WalletOpened:
				status, _ := event.Wallet.Status()
				log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", status)

				derivationPath := accounts.DefaultBaseDerivationPath
				if event.Wallet.URL().Scheme == "ledger" {
					derivationPath = accounts.DefaultLedgerBaseDerivationPath
				}
				event.Wallet.SelfDerive(derivationPath, stateReader)

			case accounts.WalletDropped:
				log.Info("Old wallet dropped", "url", event.Wallet.URL())
				event.Wallet.Close()
			}
		}
	}()
	// Start auxiliary services if enabled
	if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
		// Mining only makes sense if a full Ethereum node is running
		if ctx.GlobalString(utils.SyncModeFlag.Name) == "light" {
			utils.Fatalf("Light clients do not support mining")
		}
		var ethereum *eth.Ethereum
		if err := stack.Service(&ethereum); err != nil {
			utils.Fatalf("Ethereum service not running: %v", err)
		}
		// Use a reduced number of threads if requested
		threads := ctx.GlobalInt(utils.MinerLegacyThreadsFlag.Name)
		if ctx.GlobalIsSet(utils.MinerThreadsFlag.Name) {
			threads = ctx.GlobalInt(utils.MinerThreadsFlag.Name)
		}
		if threads > 0 {
			type threaded interface {
				SetThreads(threads int)
			}
			if th, ok := ethereum.Engine().(threaded); ok {
				th.SetThreads(threads)
			}
		}
		// Set the gas price to the limits from the CLI and start mining
		gasprice := utils.GlobalBig(ctx, utils.MinerLegacyGasPriceFlag.Name)
		if ctx.IsSet(utils.MinerGasPriceFlag.Name) {
			gasprice = utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
		}
		ethereum.TxPool().SetGasPrice(gasprice)
		if err := ethereum.StartMining(true); err != nil {
			utils.Fatalf("Failed to start mining: %v", err)
		}
	}
}

在上面的程式碼中,通過cmd/utils/cmd.go中的StartNode()函式,呼叫node/node.go中的Start()方法,啟動節點。在這個方法中,首先判斷節點是否已經在執行,然後要對p2p服務進行初始化,最後構建p2p.Server物件,執行該物件的Start()方法,使p2p服務啟動起來。

func (n *Node) Start() error {
	n.lock.Lock()
	defer n.lock.Unlock()

	// Short circuit if the node's already running
	if n.server != nil {
		return ErrNodeRunning
	}
	if err := n.openDataDir(); err != nil {
		return err
	}

	// 初始化p2p服務,配置serverConfig,並以此穿件p2p.Server例項
	n.serverConfig = n.config.P2P
	n.serverConfig.PrivateKey = n.config.NodeKey()
	n.serverConfig.Name = n.config.NodeName()
	n.serverConfig.Logger = n.log
	if n.serverConfig.StaticNodes == nil {
		n.serverConfig.StaticNodes = n.config.StaticNodes()
	}
	if n.serverConfig.TrustedNodes == nil {
		n.serverConfig.TrustedNodes = n.config.TrustedNodes()
	}
	if n.serverConfig.NodeDatabase == "" {
		n.serverConfig.NodeDatabase = n.config.NodeDB()
	}
	running := &p2p.Server{Config: n.serverConfig}
	n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)

	// Otherwise copy and specialize the P2P configuration
	services := make(map[reflect.Type]Service)
	for _, constructor := range n.serviceFuncs {
		// Create a new context for the particular service
		ctx := &ServiceContext{
			config:         n.config,
			services:       make(map[reflect.Type]Service),
			EventMux:       n.eventmux,
			AccountManager: n.accman,
		}
		for kind, s := range services { // copy needed for threaded access
			ctx.services[kind] = s
		}
		// Construct and save the service
		service, err := constructor(ctx)
		if err != nil {
			return err
		}
		kind := reflect.TypeOf(service)
		if _, exists := services[kind]; exists {
			return &DuplicateServiceError{Kind: kind}
		}
		services[kind] = service
	}
	// Gather the protocols and start the freshly assembled P2P server
	for _, service := range services {
		running.Protocols = append(running.Protocols, service.Protocols()...)
	}
	if err := running.Start(); err != nil {
		return convertFileLockError(err)
	}
	// Start each of the services
	started := []reflect.Type{}
	for kind, service := range services {
		// Start the next service, stopping all previous upon failure
		if err := service.Start(running); err != nil {
			for _, kind := range started {
				services[kind].Stop()
			}
			running.Stop()

			return err
		}
		// Mark the service started for potential cleanup
		started = append(started, kind)
	}
	// Lastly start the configured RPC interfaces
	if err := n.startRPC(services); err != nil {
		for _, service := range services {
			service.Stop()
		}
		running.Stop()
		return err
	}
	// Finish initializing the startup
	n.services = services
	n.server = running
	n.stop = make(chan struct{})

	return nil
}

至此,以太坊的啟動流程就完成了。