用golang編寫一個併發工作佇列
阿新 • • 發佈:2019-01-08
其實golang用一個函式可以構建一個併發佇列,看我之前的blog但那個功能還不夠強大,現在編寫一個靈活可控的佇列程式
先定義一個工作
type Worker struct {
ID int
RepJobs chan int64
SM *SM
quit chan bool
}
包含了workid和執行任務的id,上面的SM只是任務具體內容,這個和具體業務相關,大家自己編寫自己的SM業務邏輯
然後定義工作池
type workerPool struct {
workerChan chan *Worker
workerList []*Worker
}
這個裡面定義了一個工作佇列的切片,可以自定義工作佇列的個數,甚至後期還可以新增work,還定義了一個佇列型別的管道。定義完成過後就可以初始化工作池了
func InitWorkerPool() error {
n := 3
WorkerPool = &workerPool{
workerChan: make(chan *Worker, n),
workerList: make([]*Worker, 0, n),
}
for i := 0; i < n; i++ {
worker := NewWorker(i)
WorkerPool.workerList = append (WorkerPool.workerList, worker)
worker.Start()
log.Debugf("worker %d started", worker.ID)
}
return nil
}
這個裡面我寫死了worker的個數是3,當然這個可以通過讀取配置檔案或者引數傳遞的方式;這個裡面逐一啟動work,worker.Start(),這個是關鍵
func (w *Worker) Start() {
go func() {
for {
WorkerPool.workerChan <- w
select {
case jobID := <-w.RepJobs:
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
w.handleRepJob(jobID)
case q := <-w.quit:
if q {
log.Debugf("worker: %d, will stop.", w.ID)
return
}
}
}
}()
}
這個就是go 啟動一個協程,先把自己放到workerChan中,然後不斷從w.RepJobs管道中獲取任務並執行,如果執行完成後又把自己放回到佇列中。所以如果你要有任務需要執行,放到這個管道中即可,
func Dispatch() {
for {
select {
case job := <-jobQueue:
go func(jobID int64) {
println("Trying to dispatch job: %d", jobID)
worker := <-WorkerPool.workerChan
worker.RepJobs <- jobID
}(job)
}
}
}
從管道中拿出一個worker並把任務id放到worker中去執行。當然你可以停止worker,甚至可以停止job
func (w *Worker) Stop() {
go func() {
w.quit <- true
}()
}
func (wp *workerPool) StopJobs(jobs []int64) {
log.Debugf("Works working on jobs: %v will be stopped", jobs)
for _, id := range jobs {
for _, w := range wp.workerList {
if w.SM.JobID == id {
log.Debugf("found a worker whose job ID is %d, will try to stop it", id)
w.SM.Stop(id)
}
}
}
}
補充一下,int64和字串轉換。
string到int
int,err:=strconv.Atoi(string)
string到int64
int64, err := strconv.ParseInt(string, 10, 64)
int到string
string:=strconv.Itoa(int)
int64到string
string:=strconv.FormatInt(int64,10)