1. 程式人生 > >golang 實現一個通用協程池

golang 實現一個通用協程池

golang 是一門很優秀的語言,語法簡單,功能強大 ,支援的 channal、goroutine 等都是非常優秀的特性。由於之前用golang 重構一個專案,對golang不是太瞭解,栽了不少坑,其中主要問題為:

1. go 直接協程執行函式、方法,大併發的時候不太可控會導致協程數量急劇增加。

2.協程池方式執行有不想每一個結構體都啟動一個協程池

所以就萌生出搞一個通用協程池的想法,主要思想為,啟動多個協程 從 channal 佇列讀取資料,由業務將需要執行的方法與引數放入channal,協程池僅僅負責維護協程,自動擴容,縮減,啟用備用佇列等策略,至於返回結果之類的都有業務方實現。

關鍵實現:

1. 定義一個task 的結構體 標示具體要執行的任務格式

type Job func([]interface{})
type taskWork struct {
	Run       Job
	startBool bool
	params    []interface{}
}

2.定義一個worker 池,控制協程相關資訊

type WorkPool struct {
	taskPool  chan taskWork
	workNum   int
	maxNum    int
	stopTopic bool
	//考慮後期 作為冗餘佇列使用
	taskQue chan taskWork
}

3.實現協程池相關啟動,停止,擴容策略,縮減策略,備用佇列啟用等 邏輯

//得到一個執行緒池並返回 控制代碼
func (p *WorkPool) InitPool() {
	*p = WorkPool{workNum: workerNumDefault,
		maxNum: workerNumMax, stopTopic: false,
		taskPool: make(chan taskWork, workerNumDefault*2), taskQue: nil}

	(p).start()
	go (p).workerRemoveConf()
}

//開始work
func (p *WorkPool) start() {
	for i := 0; i < workerNumDefault; i++ {
		p.workInit(i)
		fmt.Println("start pool task:", i)
	}
}

//初始化 work池 後期應該考慮如何 自動 增減協程數,以達到最優
func (p *WorkPool) workInit(id int) {
	go func(idNum int) {
		//var i int = 0
		for {
			select {
			case task := <-p.taskPool:
				if task.startBool == true && task.Run != nil {
					//fmt.Print("this is pool ", idNum, "---")
					task.Run(task.params)
				}
				//單個結束任務
				if task.startBool == false {
					//fmt.Print("this is pool -- ", idNum, "---")
					return
				}
				//防止從channal 中讀取資料超時
			case <-time.After(time.Millisecond * 1000):
				//fmt.Println("time out init")
				if p.stopTopic == true && len(p.taskPool) == 0 {
					fmt.Println("topic=", p.stopTopic)
					//work數遞減
					p.workNum--
					return
				}
				//從備用佇列讀取資料
			case queTask := <-p.taskQue:
				if queTask.startBool == true && queTask.Run != nil {
					//fmt.Print("this is que ", idNum, "---")
					queTask.Run(queTask.params)
				}
			}

		}
	}(id)

}

//停止一個workPool
func (p *WorkPool) Stop() {
	p.stopTopic = true
}

//普通執行例項,非自動擴充
func (p *WorkPool) Run(funcJob Job, params ...interface{}) {
	p.taskPool <- taskWork{funcJob, true, params}
}

//用select 去做 實現 自動擴充 協程個數 啟用備用佇列等特性
func (p *WorkPool) RunAuto(funcJob Job, params ...interface{}) {
	task := taskWork{funcJob, true, params}
	select {
	//正常寫入
	case p.taskPool <- task:
		//寫入超時 說明佇列滿了 寫入備用佇列
	case <-time.After(time.Millisecond * 1000):
		p.taskQueInit()
		p.workerAddConf()
		//task 入備用佇列
		p.taskQue <- task
	}
}

//自動初始化備用佇列
func (p *WorkPool) taskQueInit() {
	//擴充佇列
	if p.taskQue == nil {
		p.taskQue = make(chan taskWork, p.maxNum*2)
	}
}

//自動擴充協程 簡單的自動擴充策略
func (p *WorkPool) workerAddConf() {
	//說明需要擴充程序  協程數量小於 1000 協程數量成倍增長
	if p.workNum < 1000 {
		p.workerAdd(p.workNum)
	} else if p.workNum < p.maxNum {
		tmpNum := p.maxNum - p.workNum
		tmpNum = tmpNum / 10
		if tmpNum == 0 {
			tmpNum = 1
		}
		p.workerAdd(1)
	}
}

//自動縮減協程 實現比較粗糙,可以考慮後續精細實現一些策略
func (p *WorkPool) workerRemoveConf() {
	for {
		select {
		case <-time.After(time.Millisecond * 1000 * 600):
			if p.workNum > workerNumDefault && len(p.taskPool) == 0 && len(p.taskQue) == 0 {
				rmNum := (p.workNum - workerNumDefault) / 5
				if rmNum == 0 {
					rmNum = 1
				}
				p.workerRemove(rmNum)
			}
		}
	}

}
func (p *WorkPool) workerAdd(num int) {
	for i := 0; i < num; i++ {
		p.workNum++
		p.workInit(p.workNum)
	}
}
func (p *WorkPool) workerRemove(num int) {
	for i := 0; i < num; i++ {
		task := taskWork{startBool: false}
		p.taskPool <- task
		p.workNum--
	}
}

4.我們來看一下使用的demo,可以很方便的把一個已有業務作為task 交給協程池執行了

package main

import (
	"fmt"
	"github.com/wangyaofenghist/go-Call/call"
	"github.com/wangyaofenghist/go-Call/test"
	"github.com/wangyaofenghist/go-worker-base/worker"
	"runtime"
	"time"
)

//宣告一號池子
var poolOne worker.WorkPool

//宣告回撥變數
var funcs call.CallMap

//以結構體方式呼叫
type runWorker struct{}

//初始化協程池 和回撥引數
func init() {
	poolOne.InitPool()
	funcs = call.CreateCall()

}

//通用回撥
func (f *runWorker) Run(param []interface{}) {
	name := param[0].(string)
	//呼叫回撥並拿回結果
	result, err := funcs.Call(name, param[1:]...)
	fmt.Println(result, err)
}
func runtimeNum() {
	for {
		fmt.Println("runtime num:", runtime.NumGoroutine())
		time.Sleep(time.Millisecond * 1000)
	}

}

//主函式
func main() {
	tmp := make(chan int)
	go runtimeNum()
	var runFunc runWorker = runWorker{}
	funcs.AddCall("test4", test.Test4)
	var startTime = time.Now().UnixNano()
	for i := 0; i < 10000; i++ {
		poolOne.RunAuto(runFunc.Run, "test4", " ee ", " ff")
	}
	<-tmp
	return
	
}

以上通用協程池實現略微粗糙,並沒有考慮太精細化的自動擴充協程策略或縮減策略,demo中可以將需要回調的函式加上sleep 會看到協程自動擴充與銷燬的過程,中間涉及到一個通用回撥是通過golang 的反射機制實現的一段通用程式碼,可以從gitHub 中拉取。