2.快速入門(二)
我的專案目錄:
TestCelery/ ├── proj │ ├── celeryconfig.py │ ├── celery.py │ ├── init.py │ └── tasks.py └── test.py
celery.py內容如下:
from celery import Celery # 建立celery例項 app = Celery('demo') app.config_from_object('proj.celeryconfig') # 自動搜尋任務 app.autodiscover_tasks(['proj'])
celeryconfig.p模組內容如下:
from kombu import Exchange, Queue BROKER_URL = 'redis://:[email protected]:6379/1' CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/2'
tasks.py模組內容如下:
from proj.celery import app as celery_app # 建立任務函式 @celery_app.task def my_task1(): print("任務函式(my_task1)正在執行....") @celery_app.taskdef my_task2(): print("任務函式(my_task2)正在執行....") @celery_app.task def my_task3(): print("任務函式(my_task3)正在執行....")
啟動worker:
celery -A proj worker -l info
鍵入ctrl+c可關閉worker.
呼叫任務,可使用delay()方法:
my_task.delay(2, 2)
也可以使用apply_async()方法,該方法可讓我們設定一些任務執行的引數,例如,任務多久之後才執行,任務被髮送到那個佇列中等等.
my_task.apply_async((2, 2), queue='my_queue', countdown=10)
任務my_task將會被髮送到my_queue佇列中,並且在傳送10秒之後執行。
如果我們直接執行任務函式,將會直接執行此函式在當前程序中,並不會向broker傳送任何訊息。
無論是delay()還是apply_async()方式都會返回AsyncResult物件,方便跟蹤任務執行狀態,但需要我們配置result_backend.
每一個被吊用的任務都會被分配一個ID,我們叫Task ID.
1. signature
我們到目前為止只是學習瞭如何使用delay()方法,當然這個方法也是非常常用的。但是有時我們並不想簡單的將任務傳送到佇列中,我們想將一個任務函式(由引數和執行選項組成)作為一個引數傳遞給另外一個函式中,為了實現此目標,Celery使用一種叫做signatures的東西。
一個signature包裝了一個引數和執行選項的單個任務呼叫。我們可將這個signature傳遞給函式。
我們先看下tasks.py模組中定義的任務函式:
from proj.celery import app as celery_app # 建立任務函式 @celery_app.task def my_task1(): print("任務函式(my_task1)正在執行....") @celery_app.task def my_task2(): print("任務函式(my_task2)正在執行....") @celery_app.task def my_task3(): print("任務函式(my_task3)正在執行....")
我們將my_task1()任務包裝稱一個signature:
t1 = my_task1.signatures(countdown=10)
t1.delay()
2. Primitives
這些primitives本身就是signature物件,因此它們可以以多種方式組合成複雜的工作流程。primitives如下:
group: 一組任務並行執行,返回一組返回值,並可以按順序檢索返回值。
chain: 任務一個一個執行,一個執行完將執行return結果傳遞給下一個任務函式.
tasks.py模組如下:
from proj.celery import app as celery_app # 建立任務函式 @celery_app.task def my_task1(a, b): print("任務函式(my_task1)正在執行....") return a + b @celery_app.task def my_task2(a, b): print("任務函式(my_task2)正在執行....") return a + b @celery_app.task def my_task3(a, b): print("任務函式(my_task3)正在執行....") return a + b
group案例如下(test.py模組):
from proj.tasks import my_task1 from proj.tasks import my_task2 from proj.tasks import my_task3 from celery import group # 將多個signature放入同一組中 my_group = group((my_task1.s(10, 10), my_task2.s(20, 20), my_task3.s(30, 30))) ret = my_group() # 執行組任務 print(ret.get()) # 輸出每個任務結果
chain案例如下(test.py模組):
from proj.tasks import my_task1 from proj.tasks import my_task2 from proj.tasks import my_task3 from celery import chain # 將多個signature組成一個任務鏈 # my_task1的執行結果將會傳遞給my_task2 # my_task2的執行結果會傳遞給my_task3 my_chain = chain(my_task1.s(10, 10) | my_task2.s(20) | my_task3.s(30)) ret = my_chain() # 執行任務鏈 print(ret.get()) # 輸出最終結果
假如我們有兩個worker,一個worker專門用來處理郵件傳送任務和影象處理任務,一個worker專門用來處理檔案上傳任務。
我們建立兩個佇列,一個專門用於儲存郵件任務佇列和影象處理,一個用來儲存檔案上傳任務佇列。
Celery支援AMQP(Advanced Message Queue)所有的路由功能,我們也可以使用簡單的路由設定將指定的任務傳送到指定的佇列中.
我們需要配置在celeryconfig.py模組中配置 CELERY_ROUTES 項, tasks.py模組修改如下:
from proj.celery import app as celery_app @celery_app.task def my_task1(a, b): print("my_task1任務正在執行....") return a + b @celery_app.task def my_task2(a, b): print("my_task2任務正在執行....") return a + b @celery_app.task def my_task3(a, b): print("my_task3任務正在執行....") return a + b @celery_app.task def my_task4(a, b): print("my_task3任務正在執行....") return a + b @celery_app.task def my_task5(): print("my_task5任務正在執行....") @celery_app.task def my_task6(): print("my_task6任務正在執行....") @celery_app.task def my_task7(): print("my_task7任務正在執行....")
我們通過配置,將send_email和upload_file任務傳送到queue1佇列中,將image_process傳送到queue2佇列中。
我們修改celeryconfig.py:
broker_url='redis://:@127.0.0.1:6379/1' result_backend='redis://:@127.0.0.1:6379/2' task_routes=({ 'proj.tasks.my_task5': {'queue': 'queue1'}, 'proj.tasks.my_task6': {'queue': 'queue1'}, 'proj.tasks.my_task7': {'queue': 'queue2'}, }, )
test.py:
from proj.tasks import * # 傳送任務到路由指定的佇列中
my_task5.delay() my_task6.delay() my_task7.delay()
  開啟兩個worker伺服器,分別處理兩個佇列: ```python celery -A proj worker --loglevel=info -Q queue1 celery -A proj worker --loglevel=info -Q queue2
我們同樣也可以通過apply_aynsc()方法來設定任務傳送到那個佇列中:
my_task1.apply_async(queue='queue1')
我們也可設定一個worker伺服器處理兩個佇列中的任務:
celery -A proj worker --loglevel=info -Q queue1,queue2