分散式後臺任務佇列模擬(Golang)
阿新 • • 發佈:2019-01-07
最近研究了下gowoker,這東西程式碼少而精,Golang真是很適合實現這類東西。
我去掉引數配置,JSON,Redis這些東西,用goworker的方式做了個最簡單的實現。
實現如下功能:
1. worker向JobServer註冊可執行的功能
2. JobServer輪詢,有job就執行,沒有則繼續輪詢
3. client向JobServer提出任務請求,並傳入引數
4. JobServer依請求丟給worker執行(可併發或序列執行)
5. JobServer繼續輪詢
我弄的這個程式碼很少,其中佇列用陣列代替,同時省掉了很多東西,
但保留了其goroutine與channel最基礎的實現。
我去掉引數配置,JSON,Redis這些東西,用goworker的方式做了個最簡單的實現。
實現如下功能:
1. worker向JobServer註冊可執行的功能
2. JobServer輪詢,有job就執行,沒有則繼續輪詢
3. client向JobServer提出任務請求,並傳入引數
4. JobServer依請求丟給worker執行(可併發或序列執行)
5. JobServer繼續輪詢
我弄的這個程式碼很少,其中佇列用陣列代替,同時省掉了很多東西,
但保留了其goroutine與channel最基礎的實現。
如果想看goworker的,可以參考下我這個,應當可以更快的弄明白goworker。
演示例子及運結果:
//分散式後臺任務佇列模擬(一)
//author: Xiong Chuan Liang
//date: 2015-3-24
package main
import (
"fmt"
"runtime"
//"strconv"
"time"
"jobserver"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
fmt.Println("分散式後臺任務佇列模擬(一)...")
//Job Server
js := jobserver.NewJobServer()
//模擬Worker端註冊
js.RegisterWorkerClass("mail", mailWorker)
js.RegisterWorkerClass("log", sendLogWorker)
js.RegisterWorkerClass("exception", paincWorker)
//模擬客戶端傳送請求
go func() {
time.Sleep(time.Second * 2)
js.Enqueue("mail", " [email protected]", "sub", "body")
js.Enqueue("test_notfound", "aaaaaaaaaaaaaaaaaaa")
js.Enqueue("log", "x.log", "c.log", "l.log")
//測試jobserver.PARALLEL/ORDER
//for j := 0; j < 100; j++ {
// js.Enqueue("mail", strconv.Itoa(j))
//}
time.Sleep(time.Second)
js.Enqueue("exception", "try{}exception{}")
time.Sleep(time.Second * 5)
js.Enqueue("mail", " [email protected]", "sub2", "body2")
}()
//啟動服務,開始輪詢
// StartServer(輪詢間隔,執行方式(併發/順序))
js.StartServer(time.Second*3, jobserver.ORDER) //PARALLEL
}
func mailWorker(queue string, args ...interface{}) error {
fmt.Println("......mail() begin......")
for _, arg := range args {
fmt.Println(" args:", arg)
}
fmt.Println("......mail() end......")
return nil
}
func sendLogWorker(queue string, args ...interface{}) error {
fmt.Println("......sendLog() begin......")
for _, arg := range args {
fmt.Println(" args:", arg)
}
fmt.Println("......sendLog() end......")
return nil
}
func paincWorker(queue string, args ...interface{}) error {
fmt.Println("......painc() begin......")
panic("\n test exception........................ \n")
fmt.Println("......painc() end......")
return nil
}
/*
執行結果:
分散式後臺任務佇列模擬(一)...
[JobServer] [poll] polling
......mail() begin......
args: [email protected]
args: sub
args: body
......mail() end......
[JobServer] [poll] test_notfound not found
......sendLog() begin......
args: x.log
args: c.log
args: l.log
......sendLog() end......
[JobServer] [poll] polling
......painc() begin......
[JobServer] [run] Panicking
test exception........................
[JobServer] [poll] polling
[JobServer] [poll] polling
......mail() begin......
args: [email protected]
args: sub2
args: body2
......mail() end......
[JobServer] [poll] polling
[JobServer] [poll] polling
[JobServer] [poll] polling
[JobServer] [poll] quit
*/
上面是順序執行的執行結果,如果要測試併發,可以將上面程式碼註釋部分開啟,JobServer執行方式更改為jobserver.PARALLEL,再執行即可。
具體的實現在下面:
下面兩個是基本的一些定義:
package jobserver
type workerFunc func(string, ...interface{}) error
type Workers struct {
workers map[string]workerFunc
}
package jobserver
type OrdType int
const (
PARALLEL = 1 << iota
ORDER
)
關鍵的JobServer的實現:
//分散式後臺任務佇列模擬(一)
//author: Xiong Chuan Liang
//date: 2015-3-24
package jobserver
import (
"fmt"
"runtime"
"sync"
"time"
)
type JobServer struct {
Workers
JobQueue []*WorkerClass
interval time.Duration
mt sync.Mutex
ord OrdType
}
func NewJobServer() *JobServer {
s := &JobServer{}
s.workers = make(map[string]workerFunc, 0)
return s
}
func (s *JobServer) RegisterWorkerClass(className string, f workerFunc) int {
s.mt.Lock()
defer s.mt.UnLock()
if _, found := s.workers[className]; found {
return 1
}
s.workers[className] = f
return 0
}
type WorkerClass struct {
ClassName string
Args []interface{}
}
func (s *JobServer) Enqueue(className string, args ...interface{}) bool {
s.mt.Lock()
w := &WorkerClass{className, args}
s.JobQueue = append(s.JobQueue, w)
s.mt.Unlock()
return true
}
//poller
func (s *JobServer) poll(quit <-chan bool) <-chan *WorkerClass {
jobs := make(chan *WorkerClass)
go func() {
defer close(jobs)
for {
switch {
case s.JobQueue == nil:
timeout := time.After(time.Second * 2)
select {
case <-quit:
fmt.Println("[JobServer] [poll] quit")
return
case <-timeout:
fmt.Println("[JobServer] [poll] polling")
}
default:
s.mt.Lock()
j := s.JobQueue[0]
if len(s.JobQueue)-1 <= 0 {
s.JobQueue = nil
} else {
s.JobQueue = s.JobQueue[1:len(s.JobQueue)]
}
s.mt.Unlock()
select {
case jobs <- j:
case <-quit:
fmt.Println("[JobServer] [poll] quit")
return
}
}
}
}()
return jobs
}
//worker
func (s *JobServer) work(id int, jobs <-chan *WorkerClass, monitor *sync.WaitGroup) {
monitor.Add(1)
f := func() {
defer monitor.Done()
for job := range jobs {
if f, found := s.workers[job.ClassName]; found {
s.run(f, job)
} else {
fmt.Println("[JobServer] [poll] ", job.ClassName, " not found")
}
}
}
switch s.ord {
case ORDER:
f()
default:
go f()
}
}
func (s *JobServer) run(f workerFunc, w *WorkerClass) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("[JobServer] [run] Panicking %s\n", fmt.Sprint(r))
}
}()
f(w.ClassName, w.Args...)
}
func (s *JobServer) StartServer(interval time.Duration, ord OrdType) {
s.interval = interval
s.ord = ord
quit := signals()
jobs := s.poll(quit)
var monitor sync.WaitGroup
switch s.ord {
case ORDER: //順序執行
s.work(0, jobs, &monitor)
default: //併發執行
concurrency := runtime.NumCPU()
for id := 0; id < concurrency; id++ {
s.work(id, jobs, &monitor)
}
}
monitor.Wait()
}
goworker中要複雜的多,但簡單來說最主要的就是實現上面的這些東西,我再另增加了個順序和併發的選項。
這個例子只能在本機跑,其它東西沒有。不過配合Redis,其它客戶端或其它語言則可以通過Redis來傳遞引數及實現佇列,
把它真正用起來。
MAIL: [email protected]
BLOG: http://blog.csdn.net/xcl168