1. 程式人生 > >用golang編寫一個併發工作佇列

用golang編寫一個併發工作佇列

其實golang用一個函式可以構建一個併發佇列,看我之前的blog但那個功能還不夠強大,現在編寫一個靈活可控的佇列程式
先定義一個工作

type Worker struct {
    ID      int
    RepJobs chan int64
    SM      *SM
    quit    chan bool
}

包含了workid和執行任務的id,上面的SM只是任務具體內容,這個和具體業務相關,大家自己編寫自己的SM業務邏輯

然後定義工作池

type workerPool struct {
    workerChan chan *Worker
    workerList
[]*Worker }

這個裡面定義了一個工作佇列的切片,可以自定義工作佇列的個數,甚至後期還可以新增work,還定義了一個佇列型別的管道。定義完成過後就可以初始化工作池了

func InitWorkerPool() error {
    n := 3
    WorkerPool = &workerPool{
        workerChan: make(chan *Worker, n),
        workerList: make([]*Worker, 0, n),
    }
    for i := 0; i < n; i++ {
        worker := NewWorker(i)
        WorkerPool.workerList = append
(WorkerPool.workerList, worker) worker.Start() log.Debugf("worker %d started", worker.ID) } return nil }

這個裡面我寫死了worker的個數是3,當然這個可以通過讀取配置檔案或者引數傳遞的方式;這個裡面逐一啟動work,worker.Start(),這個是關鍵

func (w *Worker) Start() {
    go func() {
        for {
            WorkerPool.workerChan <- w
            select
{ case jobID := <-w.RepJobs: log.Debugf("worker: %d, will handle job: %d", w.ID, jobID) w.handleRepJob(jobID) case q := <-w.quit: if q { log.Debugf("worker: %d, will stop.", w.ID) return } } } }() }

這個就是go 啟動一個協程,先把自己放到workerChan中,然後不斷從w.RepJobs管道中獲取任務並執行,如果執行完成後又把自己放回到佇列中。所以如果你要有任務需要執行,放到這個管道中即可,

func Dispatch() {
    for {
        select {
        case job := <-jobQueue:
            go func(jobID int64) {
                println("Trying to dispatch job: %d", jobID)
                worker := <-WorkerPool.workerChan
                worker.RepJobs <- jobID
            }(job)
        }
    }
}

從管道中拿出一個worker並把任務id放到worker中去執行。當然你可以停止worker,甚至可以停止job

func (w *Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}


func (wp *workerPool) StopJobs(jobs []int64) {
    log.Debugf("Works working on jobs: %v will be stopped", jobs)
    for _, id := range jobs {
        for _, w := range wp.workerList {
            if w.SM.JobID == id {
                log.Debugf("found a worker whose job ID is %d, will try to stop it", id)
                w.SM.Stop(id)
            }
        }
    }
}

補充一下,int64和字串轉換。
string到int
int,err:=strconv.Atoi(string)
string到int64
int64, err := strconv.ParseInt(string, 10, 64)
int到string
string:=strconv.Itoa(int)
int64到string
string:=strconv.FormatInt(int64,10)