redis 在業務層面的應用之定時器
阿新 • • 發佈:2018-11-25
前幾天出去面試,大家都喜歡聊redis,一個是底層資料結構的實現,一個是在業務層的使用,這裡就結合一些簡單的python程式碼,講下怎樣用redis 做應用層面的定時器。
首先,當大批量任務做超時管理,就會涉及到如何實現定時器,使系統開銷最小的問題。通常的底層是一個timer 加一個最小堆(有個自己的實現,可以參考:https://my.oschina.net/u/2950272/blog/1822495),或者是環形陣列,或者是紅黑樹。每種方式都用的還比較多的,nginx 使用的是紅黑樹,golang 底層time 使用的是最小堆。當業務層面要自己實現定時回撥,其實使用redis 也可以做,而且還蠻簡單的,就是zset 就ok。
具體的思路是怎樣呢?zadd 的時候,val 設定成taskid, score 設定成為timestamp 即可。然後主程式碼,開個執行緒,隔一段時間sleep 去zRangeByScore 當前時間戳到之前標記時間的值,然後就可以簡單的用redis 管理超時任務了。
具體例子的可以看下面的一個demo:
import redis import time import threading host = "127.0.0.1" port = "6379" password = "123123" db = 0 pool = redis.ConnectionPool(host=host, port=port, password=password, db=db) rd = redis.StrictRedis(connection_pool=pool) def put_task(task_id, timeout): due_time = int(time.time() + timeout)*100 print "put", task_id rd.zadd("task", task_id, due_time) def callback(task_id): print "consume taskid", task_id def get_task(): now = int(time.time() * 100) tasks = rd.zrangebyscore("task", 0, now) if tasks: rd.zremrangebyscore("task", 0, now) return tasks def produce_task(): while True: task_id = int(time.time() * 100) put_task(task_id, 1) time.sleep(1) def consume_task(): print "start loop" while True: task_ids = get_task() if task_ids: for task_id in task_ids: callback(task_id) time.sleep(1) if __name__ == "__main__": t1 = threading.Thread(target=produce_task) t2 = threading.Thread(target=consume_task) t1.start() t2.start() t1.join() t2.join()
多個執行緒註冊回撥,一個執行緒隔一段時間去輪訓資料庫即可,具體時間精度,可以控制時間戳和和迴圈最小週期,再明白思路的情況下看這個demo 應該是很好理解的。
在我們線上測試,內網環境下,qps 在2w 是沒有壓力的。