1. 程式人生 > >2.快速入門(二)

2.快速入門(二)

 我的專案目錄:

TestCelery/ ├── proj │ ├── celeryconfig.py │ ├── celery.py │ ├── init.py │ └── tasks.py └── test.py

  celery.py內容如下:

from celery import Celery

# 建立celery例項
app = Celery('demo')
app.config_from_object('proj.celeryconfig')

# 自動搜尋任務
app.autodiscover_tasks(['proj'])

  celeryconfig.p模組內容如下:

from kombu import Exchange, Queue
BROKER_URL = 'redis://:[email protected]:6379/1'
CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/2'

  tasks.py模組內容如下:

from proj.celery import app as celery_app

# 建立任務函式
@celery_app.task
def my_task1():
    print("任務函式(my_task1)正在執行....")

@celery_app.task
def my_task2(): print("任務函式(my_task2)正在執行....") @celery_app.task def my_task3(): print("任務函式(my_task3)正在執行....")

  啟動worker:

celery -A proj worker -l info

 

鍵入ctrl+c可關閉worker.

呼叫任務,可使用delay()方法:

my_task.delay(2, 2)

 

  也可以使用apply_async()方法,該方法可讓我們設定一些任務執行的引數,例如,任務多久之後才執行,任務被髮送到那個佇列中等等.

my_task.apply_async((2, 2), queue='my_queue', countdown=10)

 

任務my_task將會被髮送到my_queue佇列中,並且在傳送10秒之後執行。

  如果我們直接執行任務函式,將會直接執行此函式在當前程序中,並不會向broker傳送任何訊息。

  無論是delay()還是apply_async()方式都會返回AsyncResult物件,方便跟蹤任務執行狀態,但需要我們配置result_backend.

  每一個被吊用的任務都會被分配一個ID,我們叫Task ID.

1. signature

  我們到目前為止只是學習瞭如何使用delay()方法,當然這個方法也是非常常用的。但是有時我們並不想簡單的將任務傳送到佇列中,我們想將一個任務函式(由引數和執行選項組成)作為一個引數傳遞給另外一個函式中,為了實現此目標,Celery使用一種叫做signatures的東西。

  一個signature包裝了一個引數和執行選項的單個任務呼叫。我們可將這個signature傳遞給函式。

  我們先看下tasks.py模組中定義的任務函式:

from proj.celery import app as celery_app

# 建立任務函式
@celery_app.task
def my_task1():
    print("任務函式(my_task1)正在執行....")

@celery_app.task
def my_task2():
    print("任務函式(my_task2)正在執行....")

@celery_app.task
def my_task3():
    print("任務函式(my_task3)正在執行....")

 

  我們將my_task1()任務包裝稱一個signature:

t1 = my_task1.signatures(countdown=10)
t1.delay()

 

2. Primitives

  這些primitives本身就是signature物件,因此它們可以以多種方式組合成複雜的工作流程。primitives如下:

  group: 一組任務並行執行,返回一組返回值,並可以按順序檢索返回值。

  chain: 任務一個一個執行,一個執行完將執行return結果傳遞給下一個任務函式.

  tasks.py模組如下:

from proj.celery import app as celery_app

# 建立任務函式
@celery_app.task
def my_task1(a, b):
    print("任務函式(my_task1)正在執行....")
    return a + b

@celery_app.task
def my_task2(a, b):
    print("任務函式(my_task2)正在執行....")
    return a + b

@celery_app.task
def my_task3(a, b):
    print("任務函式(my_task3)正在執行....")
    return a + b

 

  group案例如下(test.py模組):

from proj.tasks import my_task1
from proj.tasks import my_task2
from proj.tasks import my_task3
from celery import group

# 將多個signature放入同一組中
my_group = group((my_task1.s(10, 10), my_task2.s(20, 20), my_task3.s(30, 30)))
ret = my_group() # 執行組任務
print(ret.get())  # 輸出每個任務結果

 

 chain案例如下(test.py模組):

from proj.tasks import my_task1
from proj.tasks import my_task2
from proj.tasks import my_task3
from celery import chain

# 將多個signature組成一個任務鏈
# my_task1的執行結果將會傳遞給my_task2
# my_task2的執行結果會傳遞給my_task3
my_chain = chain(my_task1.s(10, 10) | my_task2.s(20) | my_task3.s(30))
ret = my_chain()  # 執行任務鏈
print(ret.get())  # 輸出最終結果

 

 假如我們有兩個worker,一個worker專門用來處理郵件傳送任務和影象處理任務,一個worker專門用來處理檔案上傳任務。

  我們建立兩個佇列,一個專門用於儲存郵件任務佇列和影象處理,一個用來儲存檔案上傳任務佇列。

  Celery支援AMQP(Advanced Message Queue)所有的路由功能,我們也可以使用簡單的路由設定將指定的任務傳送到指定的佇列中.

  我們需要配置在celeryconfig.py模組中配置 CELERY_ROUTES 項, tasks.py模組修改如下:

from proj.celery import app as celery_app


@celery_app.task
def my_task1(a, b):
    print("my_task1任務正在執行....")
    return a + b


@celery_app.task
def my_task2(a, b):
    print("my_task2任務正在執行....")
    return a + b


@celery_app.task
def my_task3(a, b):
    print("my_task3任務正在執行....")
    return a + b


@celery_app.task
def my_task4(a, b):
    print("my_task3任務正在執行....")
    return a + b


@celery_app.task
def my_task5():
    print("my_task5任務正在執行....")


@celery_app.task
def my_task6():
    print("my_task6任務正在執行....")


@celery_app.task
def my_task7():
    print("my_task7任務正在執行....")

 

  我們通過配置,將send_email和upload_file任務傳送到queue1佇列中,將image_process傳送到queue2佇列中。

  我們修改celeryconfig.py:

broker_url='redis://:@127.0.0.1:6379/1'
result_backend='redis://:@127.0.0.1:6379/2'


task_routes=({
    'proj.tasks.my_task5': {'queue': 'queue1'},
    'proj.tasks.my_task6': {'queue': 'queue1'},
    'proj.tasks.my_task7': {'queue': 'queue2'},
    },
)

 

  test.py:

from proj.tasks import *

# 傳送任務到路由指定的佇列中

 

my_task5.delay() my_task6.delay() my_task7.delay()

  開啟兩個worker伺服器,分別處理兩個佇列:
```python
celery -A proj worker --loglevel=info -Q queue1
celery -A proj worker --loglevel=info -Q queue2

 

  我們同樣也可以通過apply_aynsc()方法來設定任務傳送到那個佇列中:

my_task1.apply_async(queue='queue1')

 

  我們也可設定一個worker伺服器處理兩個佇列中的任務:

celery -A proj worker --loglevel=info -Q queue1,queue2