1. 程式人生 > 其它 >Hyperledger Fabric使用Golang SDK測試網路吞吐量

Hyperledger Fabric使用Golang SDK測試網路吞吐量

技術標籤:Fabric區塊鏈Golanggolang區塊鏈多執行緒

1. 使用github.com/panjf2000/ants

func getContract() *gateway.Contract {

	wallet, _ := gateway.NewFileSystemWallet("resources/wallet-whu-cloud-org1")

	networkConfig := filepath.Join("resources", "connection-whu-cloud-org1.yaml")
	gw, err :=
gateway.Connect( gateway.WithConfig(config.FromFile(filepath.Clean(networkConfig))), gateway.WithIdentity(wallet, "admin"), ) if err != nil { log.Panicf("Failed to create new Gateway: " + err.Error()) } defer gw.Close() nw, err := gw.GetNetwork("mychannel")
if err != nil { log.Panicf("Failed to get network: " + err.Error()) } contract := nw.GetContract("abstore") return contract } func TestGatewayWithAnts() { poolSize := 1300 //併發數 txCount : 10000 //交易數 contract := getContract() start := time.Now() var wg sync.WaitGroup p,
_ := ants.NewPoolWithFunc(poolSize , func(i interface{}) { n := i.(int) resp, _ := contract.SubmitTransaction("Set", "key_" + strconv.Itoa(int(start.Unix())) + strconv.Itoa(n), "1") //log.Printf("NumGoroutine[%d]-Resp: %s\n", runtime.NumGoroutine(), resp) wg.Done() }) defer p.Release() start = time.Now() for i := 0; i < txCount; i++ { wg.Add(1) _ = p.Invoke(i) } wg.Wait() duration:= time.Since(start) log.Printf("Time: %s\n", duration) log.Printf("TPS: %f\n", float64(txCount) / duration.Seconds()) }

執行結果:

2020/12/22 15:11:44 Time: 6.5174183s
2020/12/22 15:11:44 TPS: 1534.349882

2. 自定義WorkerPool

var(
	channelBufferSize = 10		//chan的緩衝大小
	workerPoolSize = 500		//worker數量
	txCount = 10000				//傳送交易數量
)

type Tx struct {
	id int
	args []string	//交易引數,args[0]為函式名
}

type Output struct {	//交易執行的輸出
	tx Tx
	result []byte
}

//待發送的交易池
var txs = make(chan Tx, channelBufferSize)
//交易執行的結果池
var outputs = make(chan Output, channelBufferSize)

var contract = getContract()	//不同worker共用contract物件

//傳送交易,返回執行結果
func sendTx(tx Tx) Output {
	resp, _ := contract.SubmitTransaction(tx.args[0], tx.args[1:]...)
	//resp, _ := contract.EvaluateTransaction(tx.args[0], tx.args[1:]...)
	return Output{
		tx,
		resp,
	}
}

//單個worker,從交易池中拿交易傳送
func worker(wg *sync.WaitGroup) {
	for tx := range txs{
		outputs <- sendTx(tx)
	}
	wg.Done()
}

//建立指定數量的worker,讓它們同時工作,並等待所有worker工作完成
func createWorkerPool(workerPoolSize int) {
	var wg sync.WaitGroup
	for i := 0; i < workerPoolSize; i++ {
		wg.Add(1)
		go worker(&wg)
	}
	wg.Wait()
	close(outputs)
}

// 產生交易,傳送到交易隊池
func createTxs(txCount int) {
	//t := int(time.Now().Unix())
	for i := 0; i < txCount; i++ {
		tx := Tx{
			i,
			[]string{"Set", "key_xxx_" + strconv.Itoa(i), "1"},
		}
		txs <- tx
	}
	close(txs)
}

//從交易執行輸出佇列中取出交易輸出
func getOutputs(ch chan bool) {
	for output := range outputs {//outputs沒有close的話,會一直嘗試讀取,造成阻塞
		//log.Printf("runing routines: %d, tx intput: %+v, tx output: %s\n", runtime.NumGoroutine(), output.tx, output.result)
	}
	ch <- true
}

func TestWorkerPool() {
	startTime := time.Now()
	go createTxs(txCount)
	done := make(chan bool)
	go getOutputs(done)
	createWorkerPool(workerPoolSize)
	<-done
	duration := time.Since(startTime)
	log.Println("Time ", duration.Seconds(), "seconds")
	log.Printf("TPS: %f\n", float64(txCount) / duration.Seconds())
}

執行結果:

2020/12/22 15:13:20 Time: 6.0987865s
2020/12/22 15:13:20 TPS: 1639.670449

3. 測試環境

在這裡插入圖片描述

4. 總結

  • 兩種方式效能差不多
  • 都支援併發協程數量的控制