1. 程式人生 > >go 協程池

go 協程池

package main

import (
    "fmt"
    "log"
    "strconv"
    "sync"
)

//定義任務介面
type Runnable interface {
    run()
}

//具體任務,實現任務介面
type Task struct {
    name string //任務資料
    fun func(string)    //任務回撥方法
}

//工作協程
type Worker struct {
    id       string        //執行緒id
    taskQueue   chan Runnable //協程要執行的任務通道
wg *sync.WaitGroup //等待協程結束 } //協程池 type WorkerPool struct { workersSize int //最大協程數量 workers []*Worker //所有協程 jobQueue chan Runnable //等待執行的任務通道 workerDispatcher chan *Worker //協程分發器 wg sync.WaitGroup //等待協程池結束 } //任務方法 func (task *Task) run() { task.fun(task.name) } //啟動協程
func (w *Worker) Start() { //開一個新的協程 go func() { for { select { //從任務通道中獲得一個任務執行,如果通道中沒有任務則執行緒阻塞。 case wJob,ok := <- w.taskQueue:{ if ok { //執行任務 wJob.run() }else{ //協程通道關閉,協程結束
w.wg.Done() return } } } } }() } //停止協程 func (w *Worker) Stop() { close(w.taskQueue) } //新建一個協程物件 func (pool *WorkerPool) newWorker(Id string, taskChSize int) *Worker { log.Printf("新建執行緒:%s \n", Id) return &Worker{ id:Id, //id taskQueue: make(chan Runnable, taskChSize), //協程任務通道 wg:&pool.wg, } } //新建協程池 func NewPool(maxWorkers, workerTaskSize, maxQueue int) *WorkerPool { JobQueue := make(chan Runnable, maxQueue) workerQueue := make(chan *Worker, maxWorkers) pool := WorkerPool{ workersSize: maxWorkers, jobQueue: JobQueue, workerDispatcher: workerQueue, wg:sync.WaitGroup{}, } pool.init(workerTaskSize) return &pool } //初始化協程池中的協程 func (pool *WorkerPool) init(workerTaskSize int) { // 開始執行 for i := 0; i < pool.workersSize; i++ { worker := pool.newWorker(fmt.Sprintf("work-%s", strconv.Itoa(i)), workerTaskSize) pool.workers = append(pool.workers,worker) pool.workerDispatcher <- worker //開始工作 worker.Start() } pool.wg.Add(pool.workersSize) //監控 go pool.start() } //啟動協程池 func (pool *WorkerPool) start() { for { select { case wJob, Ok := <-pool.jobQueue: if Ok { worker := <- pool.workerDispatcher worker.taskQueue <- wJob pool.workerDispatcher <- worker } else { for _,worker := range pool.workers{ worker.Stop() } return } } } } //新增任務 func (pool *WorkerPool) addTask(job Runnable) { pool.jobQueue <- job } //協程池結束 func (pool *WorkerPool) shutdown() { close(pool.jobQueue) pool.wg.Wait() } func main() { //新建協程池 pool := NewPool(2, 10, 100) //新增100個任務 for i := 0; i < 5; i++ { task := Task{fmt.Sprintf("任務%d", i+1),func(p string){log.Println(p,"執行完成")}} pool.addTask(&task) log.Println("新增任務",i) } //關閉協程池,會等待任務結束 pool.shutdown() }
2018/09/12 18:57:30 新建執行緒:work-0 
2018/09/12 18:57:30 新建執行緒:work-1 
2018/09/12 18:57:30 新增任務 0
2018/09/12 18:57:30 新增任務 1
2018/09/12 18:57:30 新增任務 2
2018/09/12 18:57:30 新增任務 3
2018/09/12 18:57:30 新增任務 4
2018/09/12 18:57:30 任務1 執行完成
2018/09/12 18:57:30 任務3 執行完成
2018/09/12 18:57:30 任務5 執行完成
2018/09/12 18:57:30 任務2 執行完成
2018/09/12 18:57:30 任務4 執行完成

Process finished with exit code 0