go-ethereum原始碼解析-miner挖礦部分原始碼分析CPU挖礦
阿新 • • 發佈:2019-01-23
## agentagent 是具體執行挖礦的物件。 它執行的流程就是,接受計算好了的區塊頭, 計算mixhash和nonce, 把挖礦好的區塊頭返回。
構造CpuAgent, 一般情況下不會使用CPU來進行挖礦,一般來說挖礦都是使用的專門的GPU進行挖礦, GPU挖礦的程式碼不會在這裡體現。
type CpuAgent struct { mu sync.Mutex workCh chan *Work // 接受挖礦任務的通道 stop chan struct{} quitCurrentOp chan struct{} returnCh chan<- *Result // 挖礦完成後的返回channel chain consensus.ChainReader // 獲取區塊鏈的資訊 engine consensus.Engine // 一致性引擎,這裡指的是Pow引擎 isMining int32 // isMining indicates whether the agent is currently mining } func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent { miner := &CpuAgent{ chain: chain, engine: engine, stop: make(chan struct{}, 1), workCh: make(chan *Work, 1), } return miner }
設定返回值channel和得到Work的channel, 方便外界傳值和得到返回資訊。
func (self *CpuAgent) Work() chan<- *Work { return self.workCh } func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch }
啟動和訊息迴圈,如果已經啟動挖礦,那麼直接退出, 否則啟動update 這個goroutineupdate 從workCh接受任務,進行挖礦,或者是接受退出資訊,退出。 func (self *CpuAgent) Start() { if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) { return // agent already started } go self.update() } func (self *CpuAgent) update() { out: for { select { case work := <-self.workCh: self.mu.Lock() if self.quitCurrentOp != nil { close(self.quitCurrentOp) } self.quitCurrentOp = make(chan struct{}) go self.mine(work, self.quitCurrentOp) self.mu.Unlock() case <-self.stop: self.mu.Lock() if self.quitCurrentOp != nil { close(self.quitCurrentOp) self.quitCurrentOp = nil } self.mu.Unlock() break out } } }
mine, 挖礦,呼叫一致性引擎進行挖礦, 如果挖礦成功,把訊息傳送到returnCh上面。 func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil { log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash()) self.returnCh <- &Result{work, result} } else { if err != nil { log.Warn("Block sealing failed", "err", err) } self.returnCh <- nil } }GetHashRate, 這個函式返回當前的HashRate。
func (self *CpuAgent) GetHashRate() int64 { if pow, ok := self.engine.(consensus.PoW); ok { return int64(pow.Hashrate()) } return 0 }
## remote_agent remote_agent 提供了一套RPC介面,可以實現遠端礦工進行採礦的功能。 比如我有一個礦機,礦機內部沒有執行以太坊節點,礦機首先從remote_agent獲取當前的任務,然後進行挖礦計算,當挖礦完成後,提交計算結果,完成挖礦。
資料結構和構造
type RemoteAgent struct { mu sync.Mutex quitCh chan struct{} workCh chan *Work // 接受任務 returnCh chan<- *Result // 結果返回 chain consensus.ChainReader engine consensus.Engine currentWork *Work //當前的任務 work map[common.Hash]*Work // 當前還沒有提交的任務,正在計算 hashrateMu sync.RWMutex hashrate map[common.Hash]hashrate // 正在計算的任務的hashrate running int32 // running indicates whether the agent is active. Call atomically } func NewRemoteAgent(chain consensus.ChainReader, engine consensus.Engine) *RemoteAgent { return &RemoteAgent{ chain: chain, engine: engine, work: make(map[common.Hash]*Work), hashrate: make(map[common.Hash]hashrate), } }
啟動和停止 func (a *RemoteAgent) Start() { if !atomic.CompareAndSwapInt32(&a.running, 0, 1) { return } a.quitCh = make(chan struct{}) a.workCh = make(chan *Work, 1) go a.loop(a.workCh, a.quitCh) } func (a *RemoteAgent) Stop() { if !atomic.CompareAndSwapInt32(&a.running, 1, 0) { return } close(a.quitCh) close(a.workCh) }得到輸入輸出的channel,這個和agent.go一樣。
func (a *RemoteAgent) Work() chan<- *Work { return a.workCh } func (a *RemoteAgent) SetReturnCh(returnCh chan<- *Result) { a.returnCh = returnCh }
loop方法,和agent.go裡面做的工作比較類似, 當接收到任務的時候,就存放在currentWork欄位裡面。 如果84秒還沒有完成一個工作,那麼就刪除這個工作, 如果10秒沒有收到hashrate的報告,那麼刪除這個追蹤/。 // loop monitors mining events on the work and quit channels, updating the internal // state of the rmeote miner until a termination is requested. // // Note, the reason the work and quit channels are passed as parameters is because // RemoteAgent.Start() constantly recreates these channels, so the loop code cannot // assume data stability in these member fields. func (a *RemoteAgent) loop(workCh chan *Work, quitCh chan struct{}) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-quitCh: return case work := <-workCh: a.mu.Lock() a.currentWork = work a.mu.Unlock() case <-ticker.C: // cleanup a.mu.Lock() for hash, work := range a.work { if time.Since(work.createdAt) > 7*(12*time.Second) { delete(a.work, hash) } } a.mu.Unlock() a.hashrateMu.Lock() for id, hashrate := range a.hashrate { if time.Since(hashrate.ping) > 10*time.Second { delete(a.hashrate, id) } } a.hashrateMu.Unlock() } } }
GetWork,這個方法由遠端礦工呼叫,獲取當前的挖礦任務。 func (a *RemoteAgent) GetWork() ([3]string, error) { a.mu.Lock() defer a.mu.Unlock() var res [3]string if a.currentWork != nil { block := a.currentWork.Block res[0] = block.HashNoNonce().Hex() seedHash := ethash.SeedHash(block.NumberU64()) res[1] = common.BytesToHash(seedHash).Hex() // Calculate the "target" to be returned to the external miner n := big.NewInt(1) n.Lsh(n, 255) n.Div(n, block.Difficulty()) n.Lsh(n, 1) res[2] = common.BytesToHash(n.Bytes()).Hex() a.work[block.HashNoNonce()] = a.currentWork return res, nil } return res, errors.New("No work available yet, don't panic.") }
SubmitWork, 遠端礦工會呼叫這個方法提交挖礦的結果。 對結果進行驗證之後提交到returnCh
// SubmitWork tries to inject a pow solution into the remote agent, returning // whether the solution was accepted or not (not can be both a bad pow as well as // any other error, like no work pending). func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool { a.mu.Lock() defer a.mu.Unlock() // Make sure the work submitted is present work := a.work[hash] if work == nil { log.Info("Work submitted but none pending", "hash", hash) return false } // Make sure the Engine solutions is indeed valid result := work.Block.Header() result.Nonce = nonce result.MixDigest = mixDigest if err := a.engine.VerifySeal(a.chain, result); err != nil { log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) return false } block := work.Block.WithSeal(result) // Solutions seems to be valid, return to the miner and notify acceptance a.returnCh <- &Result{work, block} delete(a.work, hash) return true }
SubmitHashrate, 提交hash算力
func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) { a.hashrateMu.Lock() defer a.hashrateMu.Unlock() a.hashrate[id] = hashrate{time.Now(), rate} }
## unconfirmed
unconfirmed是一個數據結構,用來跟蹤使用者本地的挖礦資訊的,比如挖出了一個塊,那麼等待足夠的後續區塊確認之後(5個),再檢視本地挖礦的區塊是否包含在規範的區塊鏈內部。
資料結構 // headerRetriever is used by the unconfirmed block set to verify whether a previously // mined block is part of the canonical chain or not. // headerRetriever由未確認的塊組使用,以驗證先前挖掘的塊是否是規範鏈的一部分。 type headerRetriever interface { // GetHeaderByNumber retrieves the canonical header associated with a block number. GetHeaderByNumber(number uint64) *types.Header } // unconfirmedBlock is a small collection of metadata about a locally mined block // that is placed into a unconfirmed set for canonical chain inclusion tracking. // unconfirmedBlock 是本地挖掘區塊的一個小的元資料的集合,用來放入未確認的集合用來追蹤本地挖掘的區塊是否被包含進入規範的區塊鏈 type unconfirmedBlock struct { index uint64 hash common.Hash } // unconfirmedBlocks implements a data structure to maintain locally mined blocks // have have not yet reached enough maturity to guarantee chain inclusion. It is // used by the miner to provide logs to the user when a previously mined block // has a high enough guarantee to not be reorged out of te canonical chain. // unconfirmedBlocks 實現了一個數據結構,用來管理本地挖掘的區塊,這些區塊還沒有達到足夠的信任度來證明他們已經被規範的區塊連結受。 它用來給礦工提供資訊,以便他們瞭解他們之前挖到的區塊是否被包含進入了規範的區塊鏈。 type unconfirmedBlocks struct { chain headerRetriever // Blockchain to verify canonical status through 需要驗證的區塊鏈 用這個介面來獲取當前的規範的區塊頭資訊 depth uint // Depth after which to discard previous blocks 經過多少個區塊之後丟棄之前的區塊 blocks *ring.Ring // Block infos to allow canonical chain cross checks // 區塊資訊,以允許規範鏈交叉檢查 lock sync.RWMutex // Protects the fields from concurrent access }
// newUnconfirmedBlocks returns new data structure to track currently unconfirmed blocks. func newUnconfirmedBlocks(chain headerRetriever, depth uint) *unconfirmedBlocks { return &unconfirmedBlocks{ chain: chain, depth: depth, } }
插入跟蹤區塊, 當礦工挖到一個區塊的時候呼叫, index是區塊的高度, hash是區塊的hash值。 // Insert adds a new block to the set of unconfirmed ones. func (set *unconfirmedBlocks) Insert(index uint64, hash common.Hash) { // If a new block was mined locally, shift out any old enough blocks // 如果一個本地的區塊挖到了,那麼移出已經超過depth的區塊 set.Shift(index) // Create the new item as its own ring // 迴圈佇列的操作。 item := ring.New(1) item.Value = &unconfirmedBlock{ index: index, hash: hash, } // Set as the initial ring or append to the end set.lock.Lock() defer set.lock.Unlock() if set.blocks == nil { set.blocks = item } else { // 移動到迴圈佇列的最後一個元素插入item set.blocks.Move(-1).Link(item) } // Display a log for the user to notify of a new mined block unconfirmed log.Info("
構造CpuAgent, 一般情況下不會使用CPU來進行挖礦,一般來說挖礦都是使用的專門的GPU進行挖礦, GPU挖礦的程式碼不會在這裡體現。
type CpuAgent struct { mu sync.Mutex workCh chan *Work // 接受挖礦任務的通道 stop chan struct{} quitCurrentOp chan struct{} returnCh chan<- *Result // 挖礦完成後的返回channel chain consensus.ChainReader // 獲取區塊鏈的資訊 engine consensus.Engine // 一致性引擎,這裡指的是Pow引擎 isMining int32 // isMining indicates whether the agent is currently mining } func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent { miner := &CpuAgent{ chain: chain, engine: engine, stop: make(chan struct{}, 1), workCh: make(chan *Work, 1), } return miner }
設定返回值channel和得到Work的channel, 方便外界傳值和得到返回資訊。
func (self *CpuAgent) Work() chan<- *Work { return self.workCh } func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch }
啟動和訊息迴圈,如果已經啟動挖礦,那麼直接退出, 否則啟動update 這個goroutineupdate 從workCh接受任務,進行挖礦,或者是接受退出資訊,退出。 func (self *CpuAgent) Start() { if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) { return // agent already started } go self.update() } func (self *CpuAgent) update() { out: for { select { case work := <-self.workCh: self.mu.Lock() if self.quitCurrentOp != nil { close(self.quitCurrentOp) } self.quitCurrentOp = make(chan struct{}) go self.mine(work, self.quitCurrentOp) self.mu.Unlock() case <-self.stop: self.mu.Lock() if self.quitCurrentOp != nil { close(self.quitCurrentOp) self.quitCurrentOp = nil } self.mu.Unlock() break out } } }
mine, 挖礦,呼叫一致性引擎進行挖礦, 如果挖礦成功,把訊息傳送到returnCh上面。 func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil { log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash()) self.returnCh <- &Result{work, result} } else { if err != nil { log.Warn("Block sealing failed", "err", err) } self.returnCh <- nil } }GetHashRate, 這個函式返回當前的HashRate。
func (self *CpuAgent) GetHashRate() int64 { if pow, ok := self.engine.(consensus.PoW); ok { return int64(pow.Hashrate()) } return 0 }
## remote_agent
資料結構和構造
type RemoteAgent struct { mu sync.Mutex quitCh chan struct{} workCh chan *Work // 接受任務 returnCh chan<- *Result // 結果返回 chain consensus.ChainReader engine consensus.Engine currentWork *Work //當前的任務 work map[common.Hash]*Work // 當前還沒有提交的任務,正在計算 hashrateMu sync.RWMutex hashrate map[common.Hash]hashrate // 正在計算的任務的hashrate running int32 // running indicates whether the agent is active. Call atomically } func NewRemoteAgent(chain consensus.ChainReader, engine consensus.Engine) *RemoteAgent { return &RemoteAgent{ chain: chain, engine: engine, work: make(map[common.Hash]*Work), hashrate: make(map[common.Hash]hashrate), } }
啟動和停止 func (a *RemoteAgent) Start() { if !atomic.CompareAndSwapInt32(&a.running, 0, 1) { return } a.quitCh = make(chan struct{}) a.workCh = make(chan *Work, 1) go a.loop(a.workCh, a.quitCh) } func (a *RemoteAgent) Stop() { if !atomic.CompareAndSwapInt32(&a.running, 1, 0) { return } close(a.quitCh) close(a.workCh) }得到輸入輸出的channel,這個和agent.go一樣。
func (a *RemoteAgent) Work() chan<- *Work { return a.workCh } func (a *RemoteAgent) SetReturnCh(returnCh chan<- *Result) { a.returnCh = returnCh }
loop方法,和agent.go裡面做的工作比較類似, 當接收到任務的時候,就存放在currentWork欄位裡面。 如果84秒還沒有完成一個工作,那麼就刪除這個工作, 如果10秒沒有收到hashrate的報告,那麼刪除這個追蹤/。 // loop monitors mining events on the work and quit channels, updating the internal // state of the rmeote miner until a termination is requested. // // Note, the reason the work and quit channels are passed as parameters is because // RemoteAgent.Start() constantly recreates these channels, so the loop code cannot // assume data stability in these member fields. func (a *RemoteAgent) loop(workCh chan *Work, quitCh chan struct{}) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-quitCh: return case work := <-workCh: a.mu.Lock() a.currentWork = work a.mu.Unlock() case <-ticker.C: // cleanup a.mu.Lock() for hash, work := range a.work { if time.Since(work.createdAt) > 7*(12*time.Second) { delete(a.work, hash) } } a.mu.Unlock() a.hashrateMu.Lock() for id, hashrate := range a.hashrate { if time.Since(hashrate.ping) > 10*time.Second { delete(a.hashrate, id) } } a.hashrateMu.Unlock() } } }
GetWork,這個方法由遠端礦工呼叫,獲取當前的挖礦任務。 func (a *RemoteAgent) GetWork() ([3]string, error) { a.mu.Lock() defer a.mu.Unlock() var res [3]string if a.currentWork != nil { block := a.currentWork.Block res[0] = block.HashNoNonce().Hex() seedHash := ethash.SeedHash(block.NumberU64()) res[1] = common.BytesToHash(seedHash).Hex() // Calculate the "target" to be returned to the external miner n := big.NewInt(1) n.Lsh(n, 255) n.Div(n, block.Difficulty()) n.Lsh(n, 1) res[2] = common.BytesToHash(n.Bytes()).Hex() a.work[block.HashNoNonce()] = a.currentWork return res, nil } return res, errors.New("No work available yet, don't panic.") }
SubmitWork, 遠端礦工會呼叫這個方法提交挖礦的結果。 對結果進行驗證之後提交到returnCh
// SubmitWork tries to inject a pow solution into the remote agent, returning // whether the solution was accepted or not (not can be both a bad pow as well as // any other error, like no work pending). func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool { a.mu.Lock() defer a.mu.Unlock() // Make sure the work submitted is present work := a.work[hash] if work == nil { log.Info("Work submitted but none pending", "hash", hash) return false } // Make sure the Engine solutions is indeed valid result := work.Block.Header() result.Nonce = nonce result.MixDigest = mixDigest if err := a.engine.VerifySeal(a.chain, result); err != nil { log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) return false } block := work.Block.WithSeal(result) // Solutions seems to be valid, return to the miner and notify acceptance a.returnCh <- &Result{work, block} delete(a.work, hash) return true }
SubmitHashrate, 提交hash算力
func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) { a.hashrateMu.Lock() defer a.hashrateMu.Unlock() a.hashrate[id] = hashrate{time.Now(), rate} }
## unconfirmed
unconfirmed是一個數據結構,用來跟蹤使用者本地的挖礦資訊的,比如挖出了一個塊,那麼等待足夠的後續區塊確認之後(5個),再檢視本地挖礦的區塊是否包含在規範的區塊鏈內部。
資料結構 // headerRetriever is used by the unconfirmed block set to verify whether a previously // mined block is part of the canonical chain or not. // headerRetriever由未確認的塊組使用,以驗證先前挖掘的塊是否是規範鏈的一部分。 type headerRetriever interface { // GetHeaderByNumber retrieves the canonical header associated with a block number. GetHeaderByNumber(number uint64) *types.Header } // unconfirmedBlock is a small collection of metadata about a locally mined block // that is placed into a unconfirmed set for canonical chain inclusion tracking. // unconfirmedBlock 是本地挖掘區塊的一個小的元資料的集合,用來放入未確認的集合用來追蹤本地挖掘的區塊是否被包含進入規範的區塊鏈 type unconfirmedBlock struct { index uint64 hash common.Hash } // unconfirmedBlocks implements a data structure to maintain locally mined blocks // have have not yet reached enough maturity to guarantee chain inclusion. It is // used by the miner to provide logs to the user when a previously mined block // has a high enough guarantee to not be reorged out of te canonical chain. // unconfirmedBlocks 實現了一個數據結構,用來管理本地挖掘的區塊,這些區塊還沒有達到足夠的信任度來證明他們已經被規範的區塊連結受。 它用來給礦工提供資訊,以便他們瞭解他們之前挖到的區塊是否被包含進入了規範的區塊鏈。 type unconfirmedBlocks struct { chain headerRetriever // Blockchain to verify canonical status through 需要驗證的區塊鏈 用這個介面來獲取當前的規範的區塊頭資訊 depth uint // Depth after which to discard previous blocks 經過多少個區塊之後丟棄之前的區塊 blocks *ring.Ring // Block infos to allow canonical chain cross checks // 區塊資訊,以允許規範鏈交叉檢查 lock sync.RWMutex // Protects the fields from concurrent access }
// newUnconfirmedBlocks returns new data structure to track currently unconfirmed blocks. func newUnconfirmedBlocks(chain headerRetriever, depth uint) *unconfirmedBlocks { return &unconfirmedBlocks{ chain: chain, depth: depth, } }
插入跟蹤區塊, 當礦工挖到一個區塊的時候呼叫, index是區塊的高度, hash是區塊的hash值。 // Insert adds a new block to the set of unconfirmed ones. func (set *unconfirmedBlocks) Insert(index uint64, hash common.Hash) { // If a new block was mined locally, shift out any old enough blocks // 如果一個本地的區塊挖到了,那麼移出已經超過depth的區塊 set.Shift(index) // Create the new item as its own ring // 迴圈佇列的操作。 item := ring.New(1) item.Value = &unconfirmedBlock{ index: index, hash: hash, } // Set as the initial ring or append to the end set.lock.Lock() defer set.lock.Unlock() if set.blocks == nil { set.blocks = item } else { // 移動到迴圈佇列的最後一個元素插入item set.blocks.Move(-1).Link(item) } // Display a log for the user to notify of a new mined block unconfirmed log.Info("