1. 程式人生 > >celery動態任務元件Demo以及原理

celery動態任務元件Demo以及原理

celery是一個基於Python的分散式排程系統,文件在這 ,最近有個需求,想要動態的新增任務而不用重啟celery服務,找了一圈沒找到什麼好辦法(也有可能是文件沒看仔細),所以只能自己實現囉

為celery動態新增任務,首先我想到的是傳遞一個函式進去,讓某個特定任務去執行這個傳遞過去的函式,就像這樣

@app.task
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

很可惜,會出現這樣的錯誤

kombu.exceptions.EncodeError: Object of type 'function' is
not JSON serializable

換一種序列化方式

@app.task(serializer='pickle')
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

結果又出現一大串錯誤資訊

ERROR/MainProcess] Pool callback raised exception: ContentDisallowed('Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)',)
Traceback (most recent call
last): File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'chord' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py"
, line 42, in __get__ return obj.__dict__[self.__name__] KeyError: '_payload'

換一種思路

func = import_string(func)

不知道這樣是否可以,結果測試: No

哎,流年不利.

最後一直測試,一直測試,終於找到了一種辦法,直接上程式碼

from importlib import import_module, reload

app.conf.CELERY_IMPORTS = ['task', 'task.all_task']

def import_string(import_name):
    import_name = str(import_name).replace(':', '.')
    modules = import_name.split('.')
    mod = import_module(modules[0])
    for comp in modules[1:]:
        if not hasattr(mod, comp):
            reload(mod)
        mod = getattr(mod, comp)
    return mod [ @app.task def execute(func, *args, **kwargs): func = import_string(func) return func(*args, **kwargs)] 

@app.task
def execute(func, *args, **kwargs):
    func = import_string(func)
    return func(*args, **kwargs)

專案結構是這樣的

├── celery_app.py
├── config.py
├── task
│   ├── all_task.py
│   ├── __init__.py

注意: 任務必須大於等於兩層目錄

以後每次新增任務都可以先新增到all_task.py裡,呼叫時不用再重啟celery服務

# task/all_task.py

def ee(c, d):
    return c, d, '你好'

# example
from celery_app import execute

execute.delay('task.all_task.ee', 2, 444) [ execute.delay('task.all_task.ee', 2, 444)] 

ok,另外發現celery也支援任務定時呼叫,就像這樣

execute.apply_async(args=['task.all_task.aa'], eta=datetime(2017, 7, 9, 8, 12, 0))

簡單實現一個任務重複呼叫的功能

@app.task
def interval(func, seconds, args=(), task_id=None):
    next_run_time = current_time() + timedelta(seconds=seconds)
    kwargs = dict(args=(func, seconds, args), eta=next_run_time)
    if task_id is not None:
        kwargs.update(task_id=task_id)
    interval.apply_async(**kwargs)
    func = import_string(func)
    return func(*args)

大概意思就是先計算下次執行的時間,然後把任務新增到celery佇列裡,這裡有個task_id有些問題,因為假設添加了每隔3s執行一個任務,
它的task_id預設會使用uuid生成,如果想要再移除這個任務就不太方便,自定task_id可能會好一些,另外也許需要判斷task_id是否存在

AsyncResult(task_id).state

ok,再獻上一個好用的函式

from inspect import getmembers, isfunction

def get_tasks(module='task'):
    return [{
        'name': 'task:{}'.format(f[1].__name__),
        'doc': f[1].__doc__,
    } for f in getmembers(import_module(module), isfunction)] [ from inspect import getmembers, isfunction def get_tasks(module='task'): return [{ 'name': 'task:{}'.format(f[1].__name__), 'doc': f[1].__doc__, } for f in getmembers(import_module(module), isfunction)]] 

就這樣.