golang 實現一個通用協程池
阿新 • • 發佈:2019-02-05
golang 是一門很優秀的語言,語法簡單,功能強大 ,支援的 channal、goroutine 等都是非常優秀的特性。由於之前用golang 重構一個專案,對golang不是太瞭解,栽了不少坑,其中主要問題為:
1. go 直接協程執行函式、方法,大併發的時候不太可控會導致協程數量急劇增加。
2.協程池方式執行有不想每一個結構體都啟動一個協程池
所以就萌生出搞一個通用協程池的想法,主要思想為,啟動多個協程 從 channal 佇列讀取資料,由業務將需要執行的方法與引數放入channal,協程池僅僅負責維護協程,自動擴容,縮減,啟用備用佇列等策略,至於返回結果之類的都有業務方實現。
關鍵實現:
1. 定義一個task 的結構體 標示具體要執行的任務格式
type Job func([]interface{})
type taskWork struct {
Run Job
startBool bool
params []interface{}
}
2.定義一個worker 池,控制協程相關資訊
type WorkPool struct {
taskPool chan taskWork
workNum int
maxNum int
stopTopic bool
//考慮後期 作為冗餘佇列使用
taskQue chan taskWork
}
3.實現協程池相關啟動,停止,擴容策略,縮減策略,備用佇列啟用等 邏輯
//得到一個執行緒池並返回 控制代碼 func (p *WorkPool) InitPool() { *p = WorkPool{workNum: workerNumDefault, maxNum: workerNumMax, stopTopic: false, taskPool: make(chan taskWork, workerNumDefault*2), taskQue: nil} (p).start() go (p).workerRemoveConf() } //開始work func (p *WorkPool) start() { for i := 0; i < workerNumDefault; i++ { p.workInit(i) fmt.Println("start pool task:", i) } } //初始化 work池 後期應該考慮如何 自動 增減協程數,以達到最優 func (p *WorkPool) workInit(id int) { go func(idNum int) { //var i int = 0 for { select { case task := <-p.taskPool: if task.startBool == true && task.Run != nil { //fmt.Print("this is pool ", idNum, "---") task.Run(task.params) } //單個結束任務 if task.startBool == false { //fmt.Print("this is pool -- ", idNum, "---") return } //防止從channal 中讀取資料超時 case <-time.After(time.Millisecond * 1000): //fmt.Println("time out init") if p.stopTopic == true && len(p.taskPool) == 0 { fmt.Println("topic=", p.stopTopic) //work數遞減 p.workNum-- return } //從備用佇列讀取資料 case queTask := <-p.taskQue: if queTask.startBool == true && queTask.Run != nil { //fmt.Print("this is que ", idNum, "---") queTask.Run(queTask.params) } } } }(id) } //停止一個workPool func (p *WorkPool) Stop() { p.stopTopic = true } //普通執行例項,非自動擴充 func (p *WorkPool) Run(funcJob Job, params ...interface{}) { p.taskPool <- taskWork{funcJob, true, params} } //用select 去做 實現 自動擴充 協程個數 啟用備用佇列等特性 func (p *WorkPool) RunAuto(funcJob Job, params ...interface{}) { task := taskWork{funcJob, true, params} select { //正常寫入 case p.taskPool <- task: //寫入超時 說明佇列滿了 寫入備用佇列 case <-time.After(time.Millisecond * 1000): p.taskQueInit() p.workerAddConf() //task 入備用佇列 p.taskQue <- task } } //自動初始化備用佇列 func (p *WorkPool) taskQueInit() { //擴充佇列 if p.taskQue == nil { p.taskQue = make(chan taskWork, p.maxNum*2) } } //自動擴充協程 簡單的自動擴充策略 func (p *WorkPool) workerAddConf() { //說明需要擴充程序 協程數量小於 1000 協程數量成倍增長 if p.workNum < 1000 { p.workerAdd(p.workNum) } else if p.workNum < p.maxNum { tmpNum := p.maxNum - p.workNum tmpNum = tmpNum / 10 if tmpNum == 0 { tmpNum = 1 } p.workerAdd(1) } } //自動縮減協程 實現比較粗糙,可以考慮後續精細實現一些策略 func (p *WorkPool) workerRemoveConf() { for { select { case <-time.After(time.Millisecond * 1000 * 600): if p.workNum > workerNumDefault && len(p.taskPool) == 0 && len(p.taskQue) == 0 { rmNum := (p.workNum - workerNumDefault) / 5 if rmNum == 0 { rmNum = 1 } p.workerRemove(rmNum) } } } } func (p *WorkPool) workerAdd(num int) { for i := 0; i < num; i++ { p.workNum++ p.workInit(p.workNum) } } func (p *WorkPool) workerRemove(num int) { for i := 0; i < num; i++ { task := taskWork{startBool: false} p.taskPool <- task p.workNum-- } }
4.我們來看一下使用的demo,可以很方便的把一個已有業務作為task 交給協程池執行了
package main
import (
"fmt"
"github.com/wangyaofenghist/go-Call/call"
"github.com/wangyaofenghist/go-Call/test"
"github.com/wangyaofenghist/go-worker-base/worker"
"runtime"
"time"
)
//宣告一號池子
var poolOne worker.WorkPool
//宣告回撥變數
var funcs call.CallMap
//以結構體方式呼叫
type runWorker struct{}
//初始化協程池 和回撥引數
func init() {
poolOne.InitPool()
funcs = call.CreateCall()
}
//通用回撥
func (f *runWorker) Run(param []interface{}) {
name := param[0].(string)
//呼叫回撥並拿回結果
result, err := funcs.Call(name, param[1:]...)
fmt.Println(result, err)
}
func runtimeNum() {
for {
fmt.Println("runtime num:", runtime.NumGoroutine())
time.Sleep(time.Millisecond * 1000)
}
}
//主函式
func main() {
tmp := make(chan int)
go runtimeNum()
var runFunc runWorker = runWorker{}
funcs.AddCall("test4", test.Test4)
var startTime = time.Now().UnixNano()
for i := 0; i < 10000; i++ {
poolOne.RunAuto(runFunc.Run, "test4", " ee ", " ff")
}
<-tmp
return
}
以上通用協程池實現略微粗糙,並沒有考慮太精細化的自動擴充協程策略或縮減策略,demo中可以將需要回調的函式加上sleep 會看到協程自動擴充與銷燬的過程,中間涉及到一個通用回撥是通過golang 的反射機制實現的一段通用程式碼,可以從gitHub 中拉取。