Hyperledger Fabric使用Golang SDK測試網路吞吐量
阿新 • • 發佈:2020-12-28
技術標籤:Fabric區塊鏈Golanggolang區塊鏈多執行緒
1. 使用github.com/panjf2000/ants
func getContract() *gateway.Contract {
wallet, _ := gateway.NewFileSystemWallet("resources/wallet-whu-cloud-org1")
networkConfig := filepath.Join("resources", "connection-whu-cloud-org1.yaml")
gw, err := gateway.Connect(
gateway.WithConfig(config.FromFile(filepath.Clean(networkConfig))),
gateway.WithIdentity(wallet, "admin"),
)
if err != nil {
log.Panicf("Failed to create new Gateway: " + err.Error())
}
defer gw.Close()
nw, err := gw.GetNetwork("mychannel")
if err != nil {
log.Panicf("Failed to get network: " + err.Error())
}
contract := nw.GetContract("abstore")
return contract
}
func TestGatewayWithAnts() {
poolSize := 1300 //併發數
txCount : 10000 //交易數
contract := getContract()
start := time.Now()
var wg sync.WaitGroup
p, _ := ants.NewPoolWithFunc(poolSize , func(i interface{}) {
n := i.(int)
resp, _ := contract.SubmitTransaction("Set", "key_" + strconv.Itoa(int(start.Unix())) + strconv.Itoa(n), "1")
//log.Printf("NumGoroutine[%d]-Resp: %s\n", runtime.NumGoroutine(), resp)
wg.Done()
})
defer p.Release()
start = time.Now()
for i := 0; i < txCount; i++ {
wg.Add(1)
_ = p.Invoke(i)
}
wg.Wait()
duration:= time.Since(start)
log.Printf("Time: %s\n", duration)
log.Printf("TPS: %f\n", float64(txCount) / duration.Seconds())
}
執行結果:
2020/12/22 15:11:44 Time: 6.5174183s
2020/12/22 15:11:44 TPS: 1534.349882
2. 自定義WorkerPool
var(
channelBufferSize = 10 //chan的緩衝大小
workerPoolSize = 500 //worker數量
txCount = 10000 //傳送交易數量
)
type Tx struct {
id int
args []string //交易引數,args[0]為函式名
}
type Output struct { //交易執行的輸出
tx Tx
result []byte
}
//待發送的交易池
var txs = make(chan Tx, channelBufferSize)
//交易執行的結果池
var outputs = make(chan Output, channelBufferSize)
var contract = getContract() //不同worker共用contract物件
//傳送交易,返回執行結果
func sendTx(tx Tx) Output {
resp, _ := contract.SubmitTransaction(tx.args[0], tx.args[1:]...)
//resp, _ := contract.EvaluateTransaction(tx.args[0], tx.args[1:]...)
return Output{
tx,
resp,
}
}
//單個worker,從交易池中拿交易傳送
func worker(wg *sync.WaitGroup) {
for tx := range txs{
outputs <- sendTx(tx)
}
wg.Done()
}
//建立指定數量的worker,讓它們同時工作,並等待所有worker工作完成
func createWorkerPool(workerPoolSize int) {
var wg sync.WaitGroup
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(outputs)
}
// 產生交易,傳送到交易隊池
func createTxs(txCount int) {
//t := int(time.Now().Unix())
for i := 0; i < txCount; i++ {
tx := Tx{
i,
[]string{"Set", "key_xxx_" + strconv.Itoa(i), "1"},
}
txs <- tx
}
close(txs)
}
//從交易執行輸出佇列中取出交易輸出
func getOutputs(ch chan bool) {
for output := range outputs {//outputs沒有close的話,會一直嘗試讀取,造成阻塞
//log.Printf("runing routines: %d, tx intput: %+v, tx output: %s\n", runtime.NumGoroutine(), output.tx, output.result)
}
ch <- true
}
func TestWorkerPool() {
startTime := time.Now()
go createTxs(txCount)
done := make(chan bool)
go getOutputs(done)
createWorkerPool(workerPoolSize)
<-done
duration := time.Since(startTime)
log.Println("Time ", duration.Seconds(), "seconds")
log.Printf("TPS: %f\n", float64(txCount) / duration.Seconds())
}
執行結果:
2020/12/22 15:13:20 Time: 6.0987865s
2020/12/22 15:13:20 TPS: 1639.670449
3. 測試環境
4. 總結
- 兩種方式效能差不多
- 都支援併發協程數量的控制