Celery 學習筆記(4)- Workflow
Signature 物件
前面介紹了可以通過 delay 和 apply_async 來執行一個任務,多數情況下這已經足夠使用,但是有時候你希望能夠將任務及其引數傳遞給其它函式時,現有的方法就不夠用了。
在 Celery 中,提供了 signature 方法將函式和引數打包起來成為一個 signature 物件,在這個物件中可以儲存函式的引數以及任務執行的引數。
>>> from celery import signature
>>> signature('cele.add', args=(2, 2), countdown=10)
cele.add(2, 2 )
或者你可以直接在 task 物件上呼叫 signature 方法,生成一個 signature 物件,或者直接呼叫 s 這個快捷方法。
>>> add.signature((2, 2), countdown=10)
cele.add(2, 2)
>>> add.s(2, 2)
cele.add(2, 2)
和普通的 task 物件一樣,你可以直接在 signarture 物件上呼叫 delay 或者 apply_async 方法,向 Celery 佇列提交一個任務。
>>> add.s(2, 2).delay()
<AsyncResult: fa8f326d-9e10 -48f6-97c9-b33a33381231>
>>> add.s(2, 2).apply_async()
<AsyncResult: 6595e01e-ef73-4787-9f0e-5f2d5c35b103>
除了可以包含所有的引數之外,你也可以通過部分賦值的方式,產生一個偏函式物件,其效果就跟 partial 類似(不清楚偏函式的看這裡)。當你呼叫時,只需要提供剩餘引數就可以了。這樣生成的偏函式和普通的 signature 物件功能上沒有差別,只是相當於有了繫結的引數。
>>> partial_s = add.s(1)
>>> partial_s(2)
3
前面有提到可以通過 apply_aysnc 的 link 引數指定任務的後續任務,但是有一點不太令人滿意,就是預設情況下會將前一個任務的返回值以第一個引數的形式傳遞給後一個任務,也就是說,如果我不是要建立這種傳遞返回值的任務,但是函式定義卻需要額外多一個引數,否則在任務執行時會出錯。所以 Celery 也提供了一個機制使得取消掉這種預設的返回值傳遞,那就是將 signature 宣告為 immutable。也可以直接使用 si 這個快捷方法建立 immutable signature。
@app.task
def no_argument():
return 'No Argument'
>>> add.apply_async((2, 2), link=no_argument.signature(immutable=True))
>>> add.apply_async((2, 2), link=no_argument.si())
工作流
實際使用過程中,可能需要處理大量有關或無關的任務,所以 Celery 提供了一組函式,用來對任務執行流程進行控制。而其中的基本任務單元就是前面提到的 signature 物件。
chain - 任務的鏈式執行
chain 函式接受一個任務的列表,Celery 保證一個 chain 裡的子任務會依次執行,在 AsynResult 上執行 get 會得到最後一個任務的返回值。和 link 功能類似,每一個任務執行結果會當作引數傳入下一個任務,所以如果你不需要這種特性,採用 immutable signature 來取消。
>>> from celery import chain
>>> result = chain(add.s(1, 2), add.s(3), add.s(4)) # 1+2+3+4
>>> result().get()
10
group - 任務的併發執行
group 函式也接受一個任務列表,這些任務會同時加入到任務佇列中,且執行順序沒有任何保證。在 AsynResult 上執行 get 會得到一個包含了所有返回值的列表。
>>> from celery import group
>>> group(add.s(1, 2), add.s(3,4), add.s(5,6))().get()
[3, 7, 11]
chord - 帶回調的 group
chord 基本功能和 group 類似,只是有一個額外的回撥函式。回撥函式會在前面的任務全部結束時執行,其引數是一個包含了所有任務返回值的列表。在 AsynResult 上執行 get 會得到回撥函式的返回值。
@app.task
def xsum(values):
return sum(values)
>>> from celery import chord
>>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get() # xsum 收到 [2,4,6,...,18]
90
chunks - 將大量任務分解為小塊任務
和前面三個函式不同, chunks 是在 app.task 物件上的方法,它將多個任務分成幾塊執行,每一塊是一個單獨的任務由一個 worker 執行。
>>> add.chunks(zip(range(100), range(100)), 10)().get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
[20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
[40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
[60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
[80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
[100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
[120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
[140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
[160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
[180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]