go 協程池
阿新 • • 發佈:2018-12-09
package main
import (
"fmt"
"log"
"strconv"
"sync"
)
//定義任務介面
type Runnable interface {
run()
}
//具體任務,實現任務介面
type Task struct {
name string //任務資料
fun func(string) //任務回撥方法
}
//工作協程
type Worker struct {
id string //執行緒id
taskQueue chan Runnable //協程要執行的任務通道
wg *sync.WaitGroup //等待協程結束
}
//協程池
type WorkerPool struct {
workersSize int //最大協程數量
workers []*Worker //所有協程
jobQueue chan Runnable //等待執行的任務通道
workerDispatcher chan *Worker //協程分發器
wg sync.WaitGroup //等待協程池結束
}
//任務方法
func (task *Task) run() {
task.fun(task.name)
}
//啟動協程
func (w *Worker) Start() {
//開一個新的協程
go func() {
for {
select {
//從任務通道中獲得一個任務執行,如果通道中沒有任務則執行緒阻塞。
case wJob,ok := <- w.taskQueue:{
if ok {
//執行任務
wJob.run()
}else{
//協程通道關閉,協程結束
w.wg.Done()
return
}
}
}
}
}()
}
//停止協程
func (w *Worker) Stop() {
close(w.taskQueue)
}
//新建一個協程物件
func (pool *WorkerPool) newWorker(Id string, taskChSize int) *Worker {
log.Printf("新建執行緒:%s \n", Id)
return &Worker{
id:Id, //id
taskQueue: make(chan Runnable, taskChSize), //協程任務通道
wg:&pool.wg,
}
}
//新建協程池
func NewPool(maxWorkers, workerTaskSize, maxQueue int) *WorkerPool {
JobQueue := make(chan Runnable, maxQueue)
workerQueue := make(chan *Worker, maxWorkers)
pool := WorkerPool{
workersSize: maxWorkers,
jobQueue: JobQueue,
workerDispatcher: workerQueue,
wg:sync.WaitGroup{},
}
pool.init(workerTaskSize)
return &pool
}
//初始化協程池中的協程
func (pool *WorkerPool) init(workerTaskSize int) {
// 開始執行
for i := 0; i < pool.workersSize; i++ {
worker := pool.newWorker(fmt.Sprintf("work-%s", strconv.Itoa(i)), workerTaskSize)
pool.workers = append(pool.workers,worker)
pool.workerDispatcher <- worker
//開始工作
worker.Start()
}
pool.wg.Add(pool.workersSize)
//監控
go pool.start()
}
//啟動協程池
func (pool *WorkerPool) start() {
for {
select {
case wJob, Ok := <-pool.jobQueue:
if Ok {
worker := <- pool.workerDispatcher
worker.taskQueue <- wJob
pool.workerDispatcher <- worker
} else {
for _,worker := range pool.workers{
worker.Stop()
}
return
}
}
}
}
//新增任務
func (pool *WorkerPool) addTask(job Runnable) {
pool.jobQueue <- job
}
//協程池結束
func (pool *WorkerPool) shutdown() {
close(pool.jobQueue)
pool.wg.Wait()
}
func main() {
//新建協程池
pool := NewPool(2, 10, 100)
//新增100個任務
for i := 0; i < 5; i++ {
task := Task{fmt.Sprintf("任務%d", i+1),func(p string){log.Println(p,"執行完成")}}
pool.addTask(&task)
log.Println("新增任務",i)
}
//關閉協程池,會等待任務結束
pool.shutdown()
}
2018/09/12 18:57:30 新建執行緒:work-0
2018/09/12 18:57:30 新建執行緒:work-1
2018/09/12 18:57:30 新增任務 0
2018/09/12 18:57:30 新增任務 1
2018/09/12 18:57:30 新增任務 2
2018/09/12 18:57:30 新增任務 3
2018/09/12 18:57:30 新增任務 4
2018/09/12 18:57:30 任務1 執行完成
2018/09/12 18:57:30 任務3 執行完成
2018/09/12 18:57:30 任務5 執行完成
2018/09/12 18:57:30 任務2 執行完成
2018/09/12 18:57:30 任務4 執行完成
Process finished with exit code 0