1. 程式人生 > >分散式後臺任務佇列模擬(Golang)

分散式後臺任務佇列模擬(Golang)

         最近研究了下gowoker,這東西程式碼少而精,Golang真是很適合實現這類東西。
我去掉引數配置,JSON,Redis這些東西,用goworker的方式做了個最簡單的實現。

  實現如下功能:
     1. worker向JobServer註冊可執行的功能
     2. JobServer輪詢,有job就執行,沒有則繼續輪詢
     3. client向JobServer提出任務請求,並傳入引數
     4. JobServer依請求丟給worker執行(可併發或序列執行)
     5. JobServer繼續輪詢


        我弄的這個程式碼很少,其中佇列用陣列代替,同時省掉了很多東西,
但保留了其goroutine與channel最基礎的實現。

如果想看goworker的,可以參考下我這個,應當可以更快的弄明白goworker。

    演示例子及運結果:

//分散式後臺任務佇列模擬(一)
//author: Xiong Chuan Liang
//date: 2015-3-24

package main

import (
	"fmt"
	"runtime"
	//"strconv"
	"time"

	"jobserver"
)

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())

	fmt.Println("分散式後臺任務佇列模擬(一)...")

	//Job Server
	js := jobserver.NewJobServer()

	//模擬Worker端註冊
	js.RegisterWorkerClass("mail", mailWorker)
	js.RegisterWorkerClass("log", sendLogWorker)
	js.RegisterWorkerClass("exception", paincWorker)

	//模擬客戶端傳送請求
	go func() {
		time.Sleep(time.Second * 2)
		js.Enqueue("mail", "
[email protected]
", "sub", "body") js.Enqueue("test_notfound", "aaaaaaaaaaaaaaaaaaa") js.Enqueue("log", "x.log", "c.log", "l.log") //測試jobserver.PARALLEL/ORDER //for j := 0; j < 100; j++ { // js.Enqueue("mail", strconv.Itoa(j)) //} time.Sleep(time.Second) js.Enqueue("exception", "try{}exception{}") time.Sleep(time.Second * 5) js.Enqueue("mail", "
[email protected]
", "sub2", "body2") }() //啟動服務,開始輪詢 // StartServer(輪詢間隔,執行方式(併發/順序)) js.StartServer(time.Second*3, jobserver.ORDER) //PARALLEL } func mailWorker(queue string, args ...interface{}) error { fmt.Println("......mail() begin......") for _, arg := range args { fmt.Println(" args:", arg) } fmt.Println("......mail() end......") return nil } func sendLogWorker(queue string, args ...interface{}) error { fmt.Println("......sendLog() begin......") for _, arg := range args { fmt.Println(" args:", arg) } fmt.Println("......sendLog() end......") return nil } func paincWorker(queue string, args ...interface{}) error { fmt.Println("......painc() begin......") panic("\n test exception........................ \n") fmt.Println("......painc() end......") return nil } /* 執行結果: 分散式後臺任務佇列模擬(一)... [JobServer] [poll] polling ......mail() begin...... args: [email protected] args: sub args: body ......mail() end...... [JobServer] [poll] test_notfound not found ......sendLog() begin...... args: x.log args: c.log args: l.log ......sendLog() end...... [JobServer] [poll] polling ......painc() begin...... [JobServer] [run] Panicking test exception........................ [JobServer] [poll] polling [JobServer] [poll] polling ......mail() begin...... args: [email protected] args: sub2 args: body2 ......mail() end...... [JobServer] [poll] polling [JobServer] [poll] polling [JobServer] [poll] polling [JobServer] [poll] quit */
     上面是順序執行的執行結果,如果要測試併發,可以將上面程式碼註釋部分開啟,JobServer執行方式更改為jobserver.PARALLEL,再執行即可。

具體的實現在下面:

 下面兩個是基本的一些定義:

package jobserver

type workerFunc func(string, ...interface{}) error

type Workers struct {
	workers map[string]workerFunc
}

package jobserver

type OrdType int

const (
	PARALLEL = 1 << iota
	ORDER
)

關鍵的JobServer的實現:

//分散式後臺任務佇列模擬(一)
//author: Xiong Chuan Liang
//date: 2015-3-24

package jobserver

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

type JobServer struct {
	Workers
	JobQueue []*WorkerClass
	interval time.Duration
	mt       sync.Mutex
	ord      OrdType
}

func NewJobServer() *JobServer {
	s := &JobServer{}
	s.workers = make(map[string]workerFunc, 0)
	return s
}

func (s *JobServer) RegisterWorkerClass(className string, f workerFunc) int {
	s.mt.Lock()
	defer s.mt.UnLock()
	if _, found := s.workers[className]; found {
		return 1
	}
	s.workers[className] = f
	return 0
}

type WorkerClass struct {
	ClassName string
	Args      []interface{}
}

func (s *JobServer) Enqueue(className string, args ...interface{}) bool {
	s.mt.Lock()
	w := &WorkerClass{className, args}
	s.JobQueue = append(s.JobQueue, w)
	s.mt.Unlock()
	return true
}

//poller
func (s *JobServer) poll(quit <-chan bool) <-chan *WorkerClass {
	jobs := make(chan *WorkerClass)

	go func() {
		defer close(jobs)
		for {
			switch {
			case s.JobQueue == nil:
				timeout := time.After(time.Second * 2)
				select {
				case <-quit:
					fmt.Println("[JobServer] [poll] quit")
					return
				case <-timeout:
					fmt.Println("[JobServer] [poll] polling")
				}
			default:

				s.mt.Lock()
				j := s.JobQueue[0]
				if len(s.JobQueue)-1 <= 0 {
					s.JobQueue = nil
				} else {
					s.JobQueue = s.JobQueue[1:len(s.JobQueue)]
				}
				s.mt.Unlock()

				select {
				case jobs <- j:
				case <-quit:
					fmt.Println("[JobServer] [poll] quit")
					return
				}

			}
		}
	}()
	return jobs
}

//worker
func (s *JobServer) work(id int, jobs <-chan *WorkerClass, monitor *sync.WaitGroup) {
	monitor.Add(1)

	f := func() {
		defer monitor.Done()
		for job := range jobs {
			if f, found := s.workers[job.ClassName]; found {
				s.run(f, job)
			} else {
				fmt.Println("[JobServer] [poll] ", job.ClassName, " not found")
			}
		}
	}

	switch s.ord {
	case ORDER:
		f()
	default:
		go f()
	}
}

func (s *JobServer) run(f workerFunc, w *WorkerClass) {
	defer func() {
		if r := recover(); r != nil {
			fmt.Printf("[JobServer] [run] Panicking %s\n", fmt.Sprint(r))
		}
	}()

	f(w.ClassName, w.Args...)
}

func (s *JobServer) StartServer(interval time.Duration, ord OrdType) {

	s.interval = interval
	s.ord = ord

	quit := signals()
	jobs := s.poll(quit)

	var monitor sync.WaitGroup

	switch s.ord {
	case ORDER: //順序執行
		s.work(0, jobs, &monitor)
	default: //併發執行
		concurrency := runtime.NumCPU()
		for id := 0; id < concurrency; id++ {
			s.work(id, jobs, &monitor)
		}
	}

	monitor.Wait()
}

     goworker中要複雜的多,但簡單來說最主要的就是實現上面的這些東西,我再另增加了個順序和併發的選項。

這個例子只能在本機跑,其它東西沒有。不過配合Redis,其它客戶端或其它語言則可以通過Redis來傳遞引數及實現佇列,

把它真正用起來。

 MAIL: [email protected]

BLOG: http://blog.csdn.net/xcl168