1. 程式人生 > >使用 Celery Once 來防止 Celery 重複執行同一個任務

使用 Celery Once 來防止 Celery 重複執行同一個任務

使用 Celery Once 來防止 Celery 重複執行同一個任務

在使用 Celery 的時候發現有的時候 Celery 會將同一個任務執行兩遍,我遇到的情況是相同的任務在不同的 worker 中被分別執行,並且時間只相差幾毫秒。這問題我一直以為是自己哪裡處理的邏輯有問題,後來發現其他人 也有類似的問題,然後基本上出問題的都是使用 Redis 作為 Broker 的,而我這邊一方面不想將 Redis 替換掉,就只能在 task 執行的時候加分散式鎖了。

不過在 Celery 的 issue 中搜索了一下,有人使用 Redis 實現了分散式鎖,然後也有人使用了 Celery Once。 大致看了一下 Celery Once ,發現非常符合現在的情況,就用了下。

Celery Once 也是利用 Redis 加鎖來實現, Celery Once 在 Task 類基礎上實現了 QueueOnce 類,該類提供了任務去重的功能,所以在使用時,我們自己實現的方法需要將 QueueOnce 設定為 base

@task(base=QueueOnce, once={'graceful': True})

後面的 once 引數表示,在遇到重複方法時的處理方式,預設 graceful 為 False,那樣 Celery 會丟擲 AlreadyQueued 異常,手動設定為 True,則靜默處理。

另外如果要手動設定任務的 key,可以指定 keys 引數

@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

總得來說,分為幾步

第一步,安裝
pip install -U celery_once
第二步,增加配置
from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://[email protected]
//'
) celery.conf.ONCE = { 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://localhost:6379/0', 'default_timeout': 60 * 60 } }
第三步,修改 delay 方法
example.delay(10)
# 修改為
result = example.apply_async(args=(10))
第四步,修改 task 引數
@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

參考連結 https://github.com/cameronmaske/celery-once