1. 程式人生 > 實用技巧 >分散式任務佇列Celery入門與進階(二)

分散式任務佇列Celery入門與進階(二)

進階使用

對於普通的任務來說可能滿足不了我們的任務需求,所以還需要了解一些進階用法,Celery提供了諸多排程方式,例如任務編排、根據任務狀態執行不同的操作、重試機制等,以下會對常用高階用法進行講述。

定時任務&計劃任務

Celery的提供的定時任務主要靠schedules來完成,通過beat元件週期性將任務傳送給woker執行。在示例中,新建檔案period_task.py,並新增任務到配置檔案中:

period_task.py:

from project import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒執行add sender.add_periodic_task( crontab(hour=16, minute=56, day_of_week=1), #每週一下午四點五十六執行sayhai sayhi.s('wd'),name='say_hi' ) @app.task def add(x,y): print(x+y)
return x+y @app.task def sayhi(name): return 'hello %s' % name

config.py

BROKER_URL = 'redis://127.0.0.1:6379/0' # Broker配置,使用Redis作為訊息中介軟體

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # BACKEND配置,這裡使用redis

CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間
CELERY_TIMEZONE='Asia/Shanghai' # 時區配置 CELERY_IMPORTS = ( # 指定匯入的任務模組,可以指定多個 'project.tasks', 'project.period_task', #定時任務 )

任務編排

  在很多情況下,一個任務需要由多個子任務或者一個任務需要很多步驟才能完成,Celery同樣也能實現這樣的任務,完成這型別的任務通過以下模組完成:

  • group: 並行排程任務

  • chain: 鏈式任務排程

  • chord: 類似group,但分header和body2個部分,header可以是一個group任務,執行完成後呼叫body的任務

  • map: 對映排程,通過輸入多個入參來多次排程同一個任務

  • starmap: 類似map,入參類似*args

  • chunks: 將任務按照一定數量進行分組

tasks.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from project import app

@app.task
def add(x,y):
    return x+y


@app.task
def mul(x,y):
    return x*y


@app.task
def sum(data_list):
    res=0
    for i in data_list:
        res+=i
    return res