延遲任務的幾種實現思路
前言
最近有個延遲執行的任務需求,比如發了一個定時紅包,伺服器不能相信客戶端的一切,所以就得做時間的同步,但是PHP相對來講不是很適合做這種“XX秒後去執行一個什麼樣的動作這類的行為”,但是這個功能又是不可缺少的,然後就週末花時間調研了下相關的實現。大致有如下幾種:
- 藉助Redis的sorted_set和hash結構
- 自己寫一個定時器,不斷“輪詢”觸發
- 藉助語言的非同步庫
- 藉助訊息佇列等服務。
下面針對這幾點一一做下簡單的實現, 然後考慮到可維護性, 資料丟失後怎麼恢復,服務監控等一系列問題。最後選擇一個場景上來說更合適的吧。
藉助Redis實現
在正式使用Redis來實現這一延遲需求之前,我還了解到Redis的key notification事件提醒,可以在某一個key過期的時候觸發一個動作,這對於我們做延遲任務來講,的確是很好的一個契機,但是打開了它就會不可避免的造成效率上的降低,而且線上伺服器一般不會再去修改了,因此這個特性,自己瞭解下,玩玩就行了。具體的實現還是得老老實實設計資料結構了。
結構涉及
我的做法是 QUEUE 加上 CONTAINER。即會有一個根據時間不斷往前移動的時間軸作為我們的佇列,然後在佇列上每一個時間戳,作為一個連結串列往外散發,儲存多個task。
生產者productor.php
[email protected]:/tmp$ cat productor.php <?php $redis = new Redis(); $redis->connect("localhost", 6379); $redis->select(2); $QUEUE = "asyncqueue:zset"; $SERIALIZER = "serialize:hash"; // 模擬生產延遲訊息 for($index=0; $index<10; $index++) { // 每秒可能會產生多條資料,但是隻要“當秒”有資料,就需要新增到queue中 $ts = time(); $cursecond = rand(0, 9) % 2 == 0; $tasklength = rand(0, 9) % 3; if($cursecond == true) { // 當前秒有task $redis->zadd($QUEUE, $ts, $ts); if($tasklength > 0) { for($i=0; $i<$tasklength;$i++) { $key = "2614677&".rand(0, 100000); $redis->hset($SERIALIZER.":".$ts, $key, $key); echo "[{$ts}] cursecond:{$cursecond}, KEY:{$key}\n"; } } } sleep($tasklength); }
消費者consumer.php
[email protected]:/tmp$ cat consumer.php
<?php
$redis = new Redis();
$redis->connect("localhost", 6379);
$redis->select(2);
$QUEUE = "asyncqueue:zset";
$SERIALIZER = "serialize:hash";
$counter = 0;
while(true) {
$ts = 1542596034 + $counter;
$counter++;
$ret = $redis->zrangebyscore($QUEUE, $ts, $ts, array("WITHSCORES"=>true));
// 獲取下具體的task並執行
$items = $redis->hgetall($SERIALIZER.":".$ts);
foreach($items as $key=>$member) {
echo "CONSUMER[{$ts}]\t[{$key}]\t{$member}\n";
}
if($counter>=10) {
break;
}
}
測試
先來看看生產的具體內容。
[email protected]:/tmp$ vim productor.php
[1542596034] cursecond:1, KEY:2614677&46685
[1542596034] cursecond:1, KEY:2614677&99086
[1542596036] cursecond:1, KEY:2614677&38241
[1542596037] cursecond:1, KEY:2614677&74988
[1542596038] cursecond:1, KEY:2614677&69443
[1542596038] cursecond:1, KEY:2614677&25523
[1542596040] cursecond:1, KEY:2614677&29642
[1542596040] cursecond:1, KEY:2614677&15928
[1542596042] cursecond:1, KEY:2614677&91626
[1542596042] cursecond:1, KEY:2614677&7382
Press ENTER or type command to continue
然後看看消費者是否正確消費。
Press ENTER or type command to continue
CONSUMER[1542596034] [2614677&46685] 2614677&46685
CONSUMER[1542596034] [2614677&99086] 2614677&99086
CONSUMER[1542596036] [2614677&38241] 2614677&38241
CONSUMER[1542596037] [2614677&74988] 2614677&74988
CONSUMER[1542596038] [2614677&69443] 2614677&69443
CONSUMER[1542596038] [2614677&25523] 2614677&25523
CONSUMER[1542596040] [2614677&29642] 2614677&29642
CONSUMER[1542596040] [2614677&15928] 2614677&15928
CONSUMER[1542596042] [2614677&91626] 2614677&91626
CONSUMER[1542596042] [2614677&7382] 2614677&7382
Press ENTER or type command to continue
談談看法
- 利用Redis來實現,可以看出對Redis伺服器的QPS會有一個微幅提升,這個問題可以通過multi管道來稍微優化下,這裡就不多說了。
- 資料不會丟,這樣即便是服務掛掉也能將未消費的任務進行恢復。
- 服務監控以及可維護性尚佳,基於Redis,穩定效能得到保證。
- 不用切換語言,易於實現,也無需增加額外的中介軟體,減少了維護工作。
定時器⏲
原理
在網上搜索相關實現的時候,搜到一篇不錯的文章。golang實現延遲訊息的原理與方法 不錯的文章,核心思路就在於下面這張圖了。
程式碼實現
原文程式碼中有一個bug,就是在執行任務輪詢的時候沒有做休眠,會導致服務一直全速前進,這不太好。修改後的程式碼如下:
➜ asyncdemos cat delayring.go
package main
import (
"time"
"errors"
"fmt"
"github.com/kataras/iris"
"net/http"
"bytes"
"log"
"io/ioutil"
"encoding/json"
"github.com/garyburd/redigo/redis"
"strconv"
)
const (
TASK_TYPE_INTERVAL = 1
TASK_TYPE_DELAY = 2
QUEUE_LENGTH = 10
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=b716e1f39b7fc7afbea04b23909f2f779db65a117d589f886d1757"
)
//延遲訊息
type DelayMessage struct {
//當前下標
curIndex int;
//環形槽
slots [QUEUE_LENGTH]map[string]*Task;
//關閉
closed chan bool;
//任務關閉
taskClose chan bool;
//時間關閉
timeClose chan bool;
//啟動時間
startTime time.Time;
}
//執行的任務函式
type TaskFunc func(args ...interface{});
//任務
type Task struct {
//迴圈次數
cycleNum int;
//執行的函式
exec TaskFunc;
params []interface{};
catagory int
}
//建立一個延遲訊息
func NewDelayMessage() *DelayMessage {
dm := &DelayMessage{
curIndex: 0,
closed: make(chan bool),
taskClose: make(chan bool),
timeClose: make(chan bool),
startTime: time.Now(),
};
for i := 0; i < QUEUE_LENGTH; i++ {
dm.slots[i] = make(map[string]*Task);
}
return dm;
}
//啟動延遲訊息
func (dm *DelayMessage) Start() {
go dm.taskLoop();
go dm.timeLoop();
select {
case <-dm.closed:
{
dm.taskClose <- true;
dm.timeClose <- true;
break;
}
};
}
//關閉延遲訊息
func (dm *DelayMessage) Close() {
dm.closed <- true;
}
//處理每1秒的任務
func (dm *DelayMessage) taskLoop() {
defer func() {
fmt.Println("taskLoop exit");
}();
for {
// TODO 看看怎麼優化比較合適,要不加這個的話,程式會執行超過一次
time.Sleep(time.Second)
select {
case <-dm.taskClose:
{
return;
}
default:
{
//取出當前的槽的任務
tasks := dm.slots[dm.curIndex];
if len(tasks) > 0 {
//遍歷任務,判斷任務迴圈次數等於0,則執行任務
//否則任務迴圈次數減1
for k, v := range tasks {
if v.cycleNum == 0 {
fmt.Printf("\t\t\t\t\tCURINDEX[%v], key: %v, cyclenum: %v\n", dm.curIndex, k, v.cycleNum)
go v.exec(v.params...);
//刪除執行過的任務 對於catagory=1的週期性任務不予刪除
if v.catagory != TASK_TYPE_INTERVAL {
delete(tasks, k)
}
} else {
v.cycleNum--;
}
}
}
}
}
}
}
//處理每1秒移動下標
func (dm *DelayMessage) timeLoop() {
defer func() {
fmt.Println("timeLoop exit");
}();
tick := time.NewTicker(time.Second);
for {
select {
case <-dm.timeClose:
{
return;
}
case <-tick.C:
{
fmt.Printf("%v, [%v]\n", time.Now().Format("2006-01-02 15:04:05"), dm.curIndex);
//fmt.Println(dm.slots)
//判斷當前下標,如果等於3599則重置為0,否則加1
if dm.curIndex == QUEUE_LENGTH - 1 {
dm.curIndex = 0;
} else {
dm.curIndex++;
}
}
}
}
}
//新增任務
//func (dm *DelayMessage) AddTask(t time.Time, key string, catagory int, exec TaskFunc, params []interface{}) error {
func (dm *DelayMessage) AddTask(seconds int, key string, catagory int, exec TaskFunc, params []interface{}) error {
//if dm.startTime.After(t) {
// return errors.New("時間錯誤");
//}
//當前時間與指定時間相差秒數
//subSecond := t.Unix() - dm.startTime.Unix();
//subSecond := int(t.Unix() - time.Now().Unix());
subSecond := seconds
//計算迴圈次數
cycleNum := int(subSecond / QUEUE_LENGTH);
//計算任務所在的slots的下標
ix := (subSecond + dm.curIndex ) % QUEUE_LENGTH ;
fmt.Printf("\t\t\t\t\t key: %v, cycle: %v, index: %v , curIndex: %v, subseconds: %v\n", key, cycleNum, ix, dm.curIndex, subSecond)
//把任務加入tasks中
tasks := dm.slots[ix];
if _, ok := tasks[key]; ok {
return errors.New("該slots中已存在key為" + key + "的任務");
}
tasks[key] = &Task{
cycleNum: cycleNum,
exec: exec,
params: params,
catagory: catagory,
};
// TODO 持久化部分,這樣即便中途crash,下次重啟也能得到及時的恢復
return nil;
}
func (dm *DelayMessage) DeleteTask(key string) error {
tasks := dm.slots[dm.curIndex]
if _, ok := tasks[key]; ok {
delete(tasks, key)
}
return nil
}
//func main() {
//建立延遲訊息
//dm := NewDelayMessage();
////新增任務
//dm.AddTask(time.Now().Add(time.Second*2), "test1", TASK_TYPE_DELAY, func(args ...interface{}) {
// fmt.Println(args...);
//}, []interface{}{1, 2, 3});
//dm.AddTask(time.Now().Add(time.Second*4), "test2", TASK_TYPE_DELAY, func(args ...interface{}) {
// fmt.Println(args...);
//}, []interface{}{4, 5, 6});
//dm.AddTask(time.Now().Add(time.Second*12), "test3", TASK_TYPE_DELAY, func(args ...interface{}) {
// fmt.Println(args...);
//}, []interface{}{"hello", "world", "test"});
//dm.AddTask(time.Now().Add(time.Second), "test4", TASK_TYPE_INTERVAL, func(args ...interface{}) {
// fmt.Printf("操你媽", args...)
//}, []interface{}{1, 2, 3});
//
////40秒後關閉
////time.AfterFunc(time.Second*2, func() {
//// //dm.Close();
////});
//dm.Start();
//}
var mamager DelayMessage
//func publish(manager *DelayMessage, seconds int, key string, exec TaskFunc, params []interface{}) error {
// manager.AddTask(time.Now().Add(time.Second * time.Duration(seconds)), key, TASK_TYPE_DELAY, func(args... interface{}) {
// fmt.Println("key: " + key)
// }, params)
//
// return nil
//}
func httpPost(msg string, webhook string) {
formatter := `{
"msgtype": "text",
"text": {
"content":"%s",
},
"at": {
"atMobiles":[],
"isAtAll": false
}
}
`
content := fmt.Sprintf(formatter, msg + "[" + time.Now().String() + "]")
payload := []byte(content)
resp, err := http.Post(webhook, "application/json", bytes.NewBuffer(payload))
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(body))
}
type Message struct {
Sessionid string
Anchorid string
Msg string
}
func RedisPublish(info string) {
client, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Fatal(err)
return
}
defer client.Close()
resp, err := client.Do("publish", "channel", info)
if err != nil {
// TODO 跳轉到監控報警
log.Fatal(err)
return
}
fmt.Println(resp)
}
func main() {
manager := NewDelayMessage()
go manager.Start()
app := iris.New()
app.Get("/hello", func(context iris.Context) {
context.WriteString("pong")
})
app.Get("/publish", func(context iris.Context) {
msg := context.FormValue("msg")
seconds, _ := strconv.Atoi(context.FormValue("seconds"))
catagory, _ := strconv.Atoi(context.FormValue("catagory"))
fmt.Println("get params: " + msg)
if catagory != TASK_TYPE_DELAY || catagory != TASK_TYPE_INTERVAL {
catagory = TASK_TYPE_DELAY
}
manager.AddTask(seconds, "test1", catagory, func(args ...interface{}) {
httpPost(args[0].(string), DINGTALK_WEBHOOK)
}, []interface{}{msg})
context.WriteString("Added Succeed!" + time.Now().String())
})
app.Get("/delay", func(ctx iris.Context) {
message := Message{
Sessionid: ctx.FormValue("sessionid"),
Anchorid: ctx.FormValue("anchorid"),
Msg:ctx.FormValue("msg"),
}
jsondata, err := json.Marshal(&message)
if err != nil {
ctx.WriteString(err.Error())
}
RedisPublish(string(jsondata))
ctx.WriteString(string(jsondata))
})
app.Run(iris.Addr(":8080"))
}% ➜
測試
開啟服務go run delayring.go
, 然後在瀏覽器中訪問服務,大致含義是3秒後觸發一個timeout事件,觸發釘釘機器人訊息推送。
➜ asyncdemos go run delayring.go
Now listening on: http://localhost:8080
Application started. Press CMD+C to shut down.
2018-11-19 11:35:24, [0]
get params: 難受
key: test1, cycle: 0, index: 4 , curIndex: 1, subseconds: 3
2018-11-19 11:35:25, [1]
2018-11-19 11:35:26, [2]
2018-11-19 11:35:27, [3]
CURINDEX[4], key: test1, cyclenum: 0
{"errmsg":"ok","errcode":0}
2018-11-19 11:35:28, [4]
2018-11-19 11:35:29, [5]
2018-11-19 11:35:30, [6]
2018-11-19 11:35:31, [7]
^C[ERRO] 2018/11/19 11:35 http: Server closed
談談感受
- 仔細看測試結果,發現時間戳和對應執行時間戳還是可以對的上的。但是有一個極大的弊端就是資料。萬一服務掛掉了,資料就會全部丟掉,這是不能容忍的。
- 程式碼可維護性也較低,當然了,程式碼沒做啥涉及,封裝的不夠完善。
- 引入了額外的服務, 導致整個系統的可維護性降低,增大了服務宕機的危險。
- 語言相關性較強,對非golang的業務程式有一定的門檻。
藉助第三方庫
python的tornado一向以非同步高效率著稱,非同步對它來說就是個普通的業務。所以我們無需考慮具體的實現細節,專注於業務邏輯即可。那麼今天咱也來試試水。
程式碼實現
很幸運的一下子就搜到了對應的demo,如下:
➜ asyncdemos cat demo.py
#coding: utf8
__author__ = "郭 璞"
__email__ = "[email protected]"
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
import time
from tornado.options import define, options
define("port", default=8002, help="run on the port", type=int)
class SleepHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(2)
def get(self):
seconds = self.request.arguments.get("seconds", 10)
tornado.ioloop.IOLoop.instance().add_callback(self.sleep, seconds)
self.write("when i sleep")
@run_on_executor
def sleep(self, seconds):
print(time.time())
time.sleep(5)
print("yes", seconds)
print(time.time())
return seconds
if __name__ == "__main__":
# tornado.options.parse_command_line()
app = tornado.web.Application(
handlers=[(r"/sleep", SleepHandler), ])
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(8002)
tornado.ioloop.IOLoop.instance().start()%
執行服務: python demo.py
, 然後訪問服務:
檢視下輸出結果
這裡使用了預設值引數,所以可以看出也是正確的,服務在第三秒後得到了觸發並進行了對應的執行操作。
談談感受
- 庫支援,無需考慮底層細節,專注於業務流程即可。
- 面臨著和自己寫定時器一樣的問題,那就是資料的同步,以及錯誤恢復等。
- 引入了第三方服務,系統可維護性以及宕機的可能性變大。
藉助開源軟體
在和周圍人的討論中,發現延遲執行的一個解決方案就是採用訊息佇列。比如beanstalk和rabbitMQ等。我沒去調研rabbitMQ怎麼用,這塊內容挺大的,光是那一大坨的配置檔案就讓人頭大,所以我傾向於使用beanstalk。
配置環境
用之前進行安裝, 啟動即可。
# 安裝
sudo apt-get install beanstalkd
# 啟動, 並後臺執行。如果覺得不保險,還可以用nohup的形式
beanstalkd -l 127.0.0.1 -p 12345 &
使用的細節可以參考下面的這篇文章。PHP使用Beanstalkd訊息佇列
我這裡問了方便自己看下原型效果,就用python簡單寫寫了。開始之前記得安裝beanstalk的依賴庫beanstalkc,
pip install beanstalkc
程式碼實現
先來看看生產者。
[email protected]:/tmp$ cat beanstalkdemo.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
print(conn.tubes())
print(conn.stats())
conn.use("default")
ts = time.time()
handletime = ts + 10
conn.put("helloworld" + str(ts) + ", handletime:" + str(handletime), 1, 10)
print("putted")
再來看看消費者。
[email protected]:/tmp$ cat consumer.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
conn.use("default")
job = conn.reserve()
print(job.body)
job.delete()
ts = time.time()
print("CONSUME DONE: " + str(ts))
測試
- 先執行生產者。
[email protected]:/tmp$ python beanstalkdemo.py
['default']
{'current-connections': 1, 'max-job-size': 65535, 'cmd-release': 0, 'cmd-reserve': 0, 'pid': 8384, 'cmd-bury': 0, 'current-producers': 0, 'total-jobs': 0, 'current-jobs-ready': 0, 'cmd-peek-buried': 0, 'current-tubes': 1, 'id': 'b0b7cf3b44c2e296', 'current-jobs-delayed': 0, 'uptime': 2, 'cmd-watch': 0, 'hostname': 'Server218', 'job-timeouts': 0, 'cmd-stats': 1, 'rusage-stime': 0.0, 'version': 1.1, 'current-jobs-reserved': 0, 'current-jobs-buried': 0, 'cmd-reserve-with-timeout': 0, 'cmd-put': 0, 'cmd-pause-tube': 0, 'cmd-list-tubes-watched': 0, 'cmd-list-tubes': 1, 'current-workers': 0, 'cmd-list-tube-used': 0, 'cmd-ignore': 0, 'binlog-records-migrated': 0, 'current-waiting': 0, 'cmd-peek': 0, 'cmd-peek-ready': 0, 'cmd-peek-delayed': 0, 'cmd-touch': 0, 'binlog-oldest-index': 0, 'binlog-current-index': 0, 'cmd-use': 0, 'total-connections': 1, 'cmd-delete': 0, 'binlog-max-size': 10485760, 'cmd-stats-job': 0, 'rusage-utime': 0.0, 'cmd-stats-tube': 0, 'binlog-records-written': 0, 'cmd-kick': 0, 'current-jobs-urgent': 0}
putted
- 跑一下消費者,看看效果。
[email protected]:/tmp$ python consumer.py
helloworld1542605160.63, handletime:1542605170.63
CONSUME DONE: 1542605172.33
從上面可以看出,延遲執行的目標已經實現了。
談談感受
- 引入了第三方服務,造成了維護成本的增加。
- 解耦性比較好,語言無關。
- 資料可較好的儲存,不至於丟失資料,容錯性好。
總結
調研了這麼多,發現每一個都有自己的優缺點吧,沒有說哪一個是最好的選擇,只能算是合適的場景選擇合適的服務。
且行且思