1. 程式人生 > >Celery 任務分多佇列執行

Celery 任務分多佇列執行

Celery 任務分多佇列執行

需要安裝 python外掛Celery, RabbitMq

  • 程式碼結構:
    在這裡插入圖片描述

  • 建立:celery_app.py

from celery import Celery

app = Celery("TestTask")

app.config_from_object("settings")

from datetime import timedelta
from kombu import Queue
from kombu import Exchange

BROKER_URL =
'amqp://{}:{}@{}:{}'.format( 'admin', 'admin', '127.0.0.1', '5672' ) # 啟動Celery預設的定時任務 # CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' CELERY_ACCEPT_CONTENT = ['application/json'] # CELERY_TASK_SERIALIZER = 'json' # 返回資料序列化格式 CELERY_RESULT_SERIALIZER = 'json' # 時區 CELERY_TIMEZONE =
'Asia/Shanghai' CELERY_ENABLE_UTC = False # 預設開啟10程序 CELERYD_CONCURRENCY = 10 # CELERYD_MAX_TASKS_PER_CHILD = 3 # 每個worker最多執行1個任務就會被銷燬,可防止記憶體洩露 CELERY_QUEUES = ( Queue('Default', exchange=Exchange('default'), routing_key='default'), Queue('Tasks_Main', exchange=Exchange('Tasks_Main'), routing_key=
'Tasks_Main'), ) # Celery 路由設定 CELERY_ROUTES = { 'tasks.main': {'queue': 'Tasks_Main', 'routing_key': 'Tasks_Main'} } # 如果不指定QUEUE 那麼就用Default CELERY_DEFAULT_QUEUE = 'Default' CELERY_DEFAULT_EXCHANGE = 'default' CELERY_DEFAULT_ROUTING_KEY = 'default' CELERY_IMPORTS = ('tasks',)
from celery_app import app

@app.task(name='tasks.main')
def task_main(param):
    print('呼叫成功:' + str(param))
  • 執行
celery -A celery_app worker  -Q Default  -n [email protected]%d
celery -A celery_app worker  -Q Tasks_Main  -n [email protected]%d 
from tasks import task_main

task_main.apply_async(args=['Tasks_Main'], queue='Tasks_Main', routing_key='Tasks_Main')
task_main.apply_async(args=['Default'], queue='Default', routing_key='default')
  • 執行結果如下:

在這裡插入圖片描述

在這裡插入圖片描述