以太坊原始碼解讀(3)以太坊啟動流程簡析
啟動命令:
geth --identity "TestNode1" --datadir "data0" --rpc --rpcapi "db,eth,net,web3" --port "30303" --networkid "29382" --ws --wsorigins="*" --rpccorsdomain="*" console
啟動後,我們可以從日誌來分析程式啟動的流程。
INFO [10-29|12:27:11.737] Maximum peer count INFO [10-29|12:27:11.747] Starting peer-to-peer node // node/node.go INFO [10-29|12:27:11.747] Allocated cache and file handles // INFO [10-29|12:27:11.763] Initialised chain configuration // eth/backend.go INFO [10-29|12:27:11.763] Disk storage enabled for ethash caches // consensus/ethash/ethash.go INFO [10-29|12:27:11.763] Disk storage enabled for ethash DAGs INFO [10-29|12:27:11.763] Initialising Ethereum protocol WARN [10-29|12:27:11.765] Head state missing, repairing chain INFO [10-29|12:27:11.775] Rewound blockchain to past state INFO [10-29|12:27:11.776] Loaded most recent local header INFO [10-29|12:27:11.776] Loaded most recent local full block INFO [10-29|12:27:11.776] Loaded most recent local fast block INFO [10-29|12:27:11.776] Loaded local transaction journal INFO [10-29|12:27:11.777] Regenerated local transaction journal WARN [10-29|12:27:11.777] Blockchain not empty, fast sync disabled INFO [10-29|12:27:11.777] Starting P2P networking INFO [10-29|12:27:13.928] Mapped network port INFO [10-29|12:27:13.990] UDP listener up INFO [10-29|12:27:13.991] RLPx listener up INFO [10-29|12:27:13.995] IPC endpoint opened INFO [10-29|12:27:13.995] HTTP endpoint opened INFO [10-29|12:27:13.996] WebSocket endpoint opened INFO [10-29|12:27:14.001] Mapped network port Welcome to the Geth JavaScript console!
啟動流程圖
一、啟動的main函式 cmd/geth/main.go
Go裡面有兩個保留的函式:init函式(能夠應用於所有的package)和main函式(只能應用於package main)。這兩個函式在定義時不能有任何的引數和返回值。
在cmd/geth/main.go中,首先定義app:
cmd/geth/main.go var ( // Git SHA1 commit hash of the release (set via linker flags) gitCommit = "" // The app that holds all commands and flags. app = utils.NewApp(gitCommit, "the go-ethereum command line interface") // flags nodeFlags = []cli.Flag{ utils.IdentityFlag, // 所有這些flag都來自cmd/utils模組中 ... } rpcFlags = []cli.Flag{...} consoleFlags = []cli.Flag{...} // 來自cmd/geth/consolecmd.go whisperFlags = []cli.Flag{...} metricsFlafs = []cli.Flag{...} )
然後通過init()函式來初始化app,其中app.Action表示如果使用者沒有輸入其他的子命令的情況下,會呼叫這個欄位指向的函式,即geth()。
geth的命令使用了urfave/cli這個庫,這個庫是go語言命令列程式常用的庫,它把命令列解析的過程做了一下封裝,抽象出flag/command/subcommand這些模組,使用者只需要提供一些模組的配置,引數的解析和關聯在庫內部完成,幫助資訊也可以自動生成。
app.Flags和app.Commands分別設定了支援的[option]和[command],是當從使用者輸入命令解析出相應的引數後指向特定的函式並執行。這裡先不做介紹,後面以console的啟動為例介紹這一部分的原理。
func init() {
// Initialize the CLI app and start Geth
app.Action = geth
app.HideVersion = true // we have a command to print the version
app.Copyright = "Copyright 2013-2018 The go-ethereum Authors"
// 所有能夠支援的子命令
app.Commands = []cli.Command{
// See chaincmd.go:
initCommand,
...
// See monitorcmd.go:
// See accountcmd.go:
// See consolecmd.go:
// See misccmd.go:
// See config.go
}
sort.Sort(cli.CommandsByName(app.Commands))
// 所有能夠解析的Options
app.Flags = append(app.Flags, nodeFlags...)
app.Flags = append(app.Flags, rpcFlags...)
app.Flags = append(app.Flags, consoleFlags...)
app.Flags = append(app.Flags, debug.Flags...)
app.Flags = append(app.Flags, whisperFlags...)
app.Flags = append(app.Flags, metricsFlags...)
app.Before = func(ctx *cli.Context) error {
...
}
app.After = func(ctx *cli.Context) error {
debug.Exit()
console.Stdin.Close() // Resets terminal mode.
return nil
}
}
通過上面的程式碼就把我們解析使用者命令的物件設定完成了,下一步就是執行app.Run()。
func main() {
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
在以太坊客戶端geth中,如果什麼命令都不輸入直接執行geth, 就會預設啟動一個全節點模式的節點,連線到主網路。這時候就是按照上面所說的,啟動了geth()函式:
cmd/geth/main.go
func geth(ctx *cli.Context) error {
if args := ctx.Args(); len(args) > 0 {
return fmt.Errorf("invalid command: %q", args[0])
}
node := makeFullNode(ctx) // 定義全節點物件
startNode(ctx, node) // 啟動全節點
node.Wait()
return nil
}
二、全節點配置
在cmd/geth/main.go中有一個startNode()函式用來啟動全節點,首先呼叫cmd/geth/config.go中的makeFullNode()函式:
func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx) // 進行節點配置
utils.RegisterEthService(stack, &cfg.Eth) // 註冊eth服務
if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
}
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
shhEnabled := enableWhisper(ctx)
shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
if shhEnabled || shhAutoEnabled {
if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
}
if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
}
utils.RegisterShhService(stack, &cfg.Shh)
}
// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
}
return stack
}
可以看出,makeFullNode首先通過makeConfigNode(ctx) 對節點進行配置,包括Eth、Shh、Node、Dashboard,返回Node和geth配置,然後開啟兩條路線:1、通過Node.Start()——>Server.Start()啟動p2p服務;2、通過RegisterEthService將Ethereum服務註冊到Node的services map[reflect.Type]Service中,通過Node.Start()來啟動Ethereum服務。
func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
// Load defaults.
cfg := gethConfig{
Eth: eth.DefaultConfig,
Shh: whisper.DefaultConfig,
Node: defaultNodeConfig(),
Dashboard: dashboard.DefaultConfig,
}
// Load config file.
if file := ctx.GlobalString(configFileFlag.Name); file != "" {
if err := loadConfig(file, &cfg); err != nil {
utils.Fatalf("%v", err)
}
}
// Apply flags.
utils.SetNodeConfig(ctx, &cfg.Node)
stack, err := node.New(&cfg.Node)
if err != nil {
utils.Fatalf("Failed to create the protocol stack: %v", err)
}
utils.SetEthConfig(ctx, stack, &cfg.Eth)
if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) {
cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
}
utils.SetShhConfig(ctx, stack, &cfg.Shh)
utils.SetDashboardConfig(ctx, &cfg.Dashboard)
return stack, cfg
}
三、註冊ETH服務
makeFullNode()函式裡在做好節點配置之後,再呼叫cmd/utils/flag.go裡RegisterEthService()函式註冊eth服務,而這個函式是呼叫了node/node.go模組中的Register()方法 ,這個方法的引數是一個建構函式(constructor),正如下面的程式碼所示:
cmd/utils/flag.go
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
}
這個函式裡會判斷同步的方式 ,如果是LightSync則會使用les.New()建立輕節點,否則就使用eth.New()建立全節點,這裡我們還是建立全節點,即呼叫eth.New()方法。
eth/backend.go
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
...
chainDb, err := CreateDB(ctx, config, "chaindata")
...
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb),
shutdownChan: make(chan bool),
networkID: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms),
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
...
var (
vmConfig = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
eth.blockchain.SetHead(compat.RewindTo)
rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
eth.bloomIndexer.Start(eth.blockchain)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
eth.APIBackend = &EthAPIBackend{eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.GasPrice
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
return eth, nil
}
總體上看來,這個函式主要是執行了以下幾步:
1、載入創始區塊
2、初始化以太坊類
3、初始化BlookChain
4、初始化交易池
5、初始化協議管理器
6、初始化挖礦模式
四、啟動P2P網路
回到cmd/geth/main.go,在節點配置完成後執行startNode()函式。startNode()的主要功能有:
1、啟動node;
2、解鎖賬戶;
3、開啟錢包事件監聽;
func startNode(ctx *cli.Context, stack *node.Node) {
debug.Memsize.Add("node", stack)
// Start up the node itself
utils.StartNode(stack)
// Unlock any account specifically requested
ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
passwords := utils.MakePasswordList(ctx)
unlocks := strings.Split(ctx.GlobalString(utils.UnlockedAccountFlag.Name), ",")
for i, account := range unlocks {
if trimmed := strings.TrimSpace(account); trimmed != "" {
unlockAccount(ctx, ks, trimmed, i, passwords)
}
}
// Register wallet event handlers to open and auto-derive wallets
events := make(chan accounts.WalletEvent, 16)
stack.AccountManager().Subscribe(events)
go func() {
// Create a chain state reader for self-derivation
rpcClient, err := stack.Attach()
if err != nil {
utils.Fatalf("Failed to attach to self: %v", err)
}
stateReader := ethclient.NewClient(rpcClient)
// Open any wallets already attached
for _, wallet := range stack.AccountManager().Wallets() {
if err := wallet.Open(""); err != nil {
log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err)
}
}
// Listen for wallet event till termination
for event := range events {
switch event.Kind {
case accounts.WalletArrived:
if err := event.Wallet.Open(""); err != nil {
log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err)
}
case accounts.WalletOpened:
status, _ := event.Wallet.Status()
log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", status)
derivationPath := accounts.DefaultBaseDerivationPath
if event.Wallet.URL().Scheme == "ledger" {
derivationPath = accounts.DefaultLedgerBaseDerivationPath
}
event.Wallet.SelfDerive(derivationPath, stateReader)
case accounts.WalletDropped:
log.Info("Old wallet dropped", "url", event.Wallet.URL())
event.Wallet.Close()
}
}
}()
// Start auxiliary services if enabled
if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
// Mining only makes sense if a full Ethereum node is running
if ctx.GlobalString(utils.SyncModeFlag.Name) == "light" {
utils.Fatalf("Light clients do not support mining")
}
var ethereum *eth.Ethereum
if err := stack.Service(ðereum); err != nil {
utils.Fatalf("Ethereum service not running: %v", err)
}
// Use a reduced number of threads if requested
threads := ctx.GlobalInt(utils.MinerLegacyThreadsFlag.Name)
if ctx.GlobalIsSet(utils.MinerThreadsFlag.Name) {
threads = ctx.GlobalInt(utils.MinerThreadsFlag.Name)
}
if threads > 0 {
type threaded interface {
SetThreads(threads int)
}
if th, ok := ethereum.Engine().(threaded); ok {
th.SetThreads(threads)
}
}
// Set the gas price to the limits from the CLI and start mining
gasprice := utils.GlobalBig(ctx, utils.MinerLegacyGasPriceFlag.Name)
if ctx.IsSet(utils.MinerGasPriceFlag.Name) {
gasprice = utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
}
ethereum.TxPool().SetGasPrice(gasprice)
if err := ethereum.StartMining(true); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
}
}
在上面的程式碼中,通過cmd/utils/cmd.go中的StartNode()函式,呼叫node/node.go中的Start()方法,啟動節點。在這個方法中,首先判斷節點是否已經在執行,然後要對p2p服務進行初始化,最後構建p2p.Server物件,執行該物件的Start()方法,使p2p服務啟動起來。
func (n *Node) Start() error {
n.lock.Lock()
defer n.lock.Unlock()
// Short circuit if the node's already running
if n.server != nil {
return ErrNodeRunning
}
if err := n.openDataDir(); err != nil {
return err
}
// 初始化p2p服務,配置serverConfig,並以此穿件p2p.Server例項
n.serverConfig = n.config.P2P
n.serverConfig.PrivateKey = n.config.NodeKey()
n.serverConfig.Name = n.config.NodeName()
n.serverConfig.Logger = n.log
if n.serverConfig.StaticNodes == nil {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
if n.serverConfig.TrustedNodes == nil {
n.serverConfig.TrustedNodes = n.config.TrustedNodes()
}
if n.serverConfig.NodeDatabase == "" {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
running := &p2p.Server{Config: n.serverConfig}
n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
// Otherwise copy and specialize the P2P configuration
services := make(map[reflect.Type]Service)
for _, constructor := range n.serviceFuncs {
// Create a new context for the particular service
ctx := &ServiceContext{
config: n.config,
services: make(map[reflect.Type]Service),
EventMux: n.eventmux,
AccountManager: n.accman,
}
for kind, s := range services { // copy needed for threaded access
ctx.services[kind] = s
}
// Construct and save the service
service, err := constructor(ctx)
if err != nil {
return err
}
kind := reflect.TypeOf(service)
if _, exists := services[kind]; exists {
return &DuplicateServiceError{Kind: kind}
}
services[kind] = service
}
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)
}
if err := running.Start(); err != nil {
return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
for kind, service := range services {
// Start the next service, stopping all previous upon failure
if err := service.Start(running); err != nil {
for _, kind := range started {
services[kind].Stop()
}
running.Stop()
return err
}
// Mark the service started for potential cleanup
started = append(started, kind)
}
// Lastly start the configured RPC interfaces
if err := n.startRPC(services); err != nil {
for _, service := range services {
service.Stop()
}
running.Stop()
return err
}
// Finish initializing the startup
n.services = services
n.server = running
n.stop = make(chan struct{})
return nil
}
至此,以太坊的啟動流程就完成了。