1. 程式人生 > >分散式任務佇列 Celery —— 詳解工作流

分散式任務佇列 Celery —— 詳解工作流

目錄

前文列表

前言

Celery 的工作流具有非常濃厚的函數語言程式設計風格,在理解工作流之前,我們需要對「簽名」、「偏函式」以及「回撥函式」有所瞭解。

文中所用的示例程式碼緊接前文,其中 taks.py 模組有少量修改。

# filename: tasks.py
from proj.celery import app

@app.task
def add(x, y, debug=False):
    if debug:
        print("x: %s; y: %s" % (x, y))
    return x + y

@app.task
def log
(msg):
return "LOG: %s" % msg

任務簽名 signature

使用 Celery Signature 簽名(Subtask 子任務),可生成一個特殊的物件——任務簽名。任務簽名和函式簽名類似,除了包含函式的常規宣告資訊(形參、返回值)之外,還包含了執行該函式所需要的呼叫約定和(全部/部分)實參列表。你可以在任意位置直接使用該簽名,甚至不需要考慮實參傳遞的問題(實參可能在生成簽名時就已經包含)。可見,任務簽名的這種特效能夠讓不同任務間的組合、巢狀、調配變得簡單。

任務簽名支援「直接執行」和「Worker 執行」兩種方式:

  • 生成任務簽名並直接執行:簽名在當前程序中執行
>>> from celery import signature
>>> from proj.task.tasks import add

# 方式一
>>> signature('proj.task.tasks.add', args=(2, 3), countdown=10)
proj.task.tasks.add(2, 3)
>>> s_add = signature('proj.task.tasks.add', args=(2, 3), countdown=10)
>>> s_add()
5

# 方式二
>>> add.signature((3, 4), countdown=10) proj.task.tasks.add(3, 4) >>> s_add = add.signature((3, 4), countdown=10) >>> s_add() 7 # 方式三 >>> add.subtask((3, 4), countdown=10) proj.task.tasks.add(3, 4) >>> s_add = add.subtask((3, 4), countdown=10) >>> s_add() 7 # 方式四 >>> add.s(3, 4) proj.task.tasks.add(3, 4) >>> s_add = add.s(3, 4) >>> s_add() 7
  • 生成任務簽名並交由 Worker 執行:簽名在 Worker 服務程序中執行
# 呼叫 delay/apply_async 方法將簽名載入到 Worker 中執行
>>> s_add = add.s(2, 2)
>>> s_add.delay()       
<AsyncResult: 75be3776-b36b-458e-9a89-512121cdaa32>
>>> s_add.apply_async()
<AsyncResult: 4f1bf824-331a-42c0-9580-48b5a45c2f7a>


>>> s_add = add.s(2, 2)
>>> s_add.delay(debug=True)     # 任務簽名支援動態傳遞實參
<AsyncResult: 1a1c97c5-8e81-4871-bb8d-def39eb539fc>
>>> s_add.apply_async(kwargs={'debug': True})
<AsyncResult: 36d10f10-3e6f-46c4-9dde-d2eabb24c61c>

這裡寫圖片描述

偏函式

偏函式(Partial Function Application,PFA):將擁有任意形引數量(順序)的函式轉化為另一個已經包含了任意實參的新的函式。簡單來說,就是返回一個新的函式物件並將函式中的某些形參,固化為實參。從某個角度來看類似函式的預設引數特性,但也並不全是。因為預設引數列表是持久不變的,而 PFA 中,固化的引數列表能夠任意定義。對於同一個函式,你可以得到關於它的許多偏函式,而且每一個偏函式的固化引數列表都可以不同。e.g.

# 普通偏函式:
>>> from functools import partial
>>> add_1 = partial(add, 1)
>>> add_1(2)
3
# add_1(x) == add(1, x)

>>> int_base2 = partial(int, base=2)
>>> int_base2.__doc__ = ‘Convert base 2 string to an int.'
>>> int_base2('10010')
18
int_base2(x) == int(x, base=2)

Celery 中的偏函式實際上就是固化了部分引數的任務簽名。e.g.

# Celery 偏函式
>>> add.s(1)
proj.task.tasks.add(1)
>>> s_add_1 = add.s(1)
>>> s_add_1(10)
11
>>> s_add_1.delay(20)
<AsyncResult: eb88ad9c-31f6-484f-8fd5-735a498aedbc>

回撥函式

回撥函式就是一個通過函式指標(函式名)來呼叫的函式。如果你把函式指標作為引數傳遞給另一個函式,當這個指標被用來呼叫其所指向的函式時,我們就稱之為回撥。回撥函式在發生特定事件或滿足指定條件時被回撥,從而對該事件或條件進行響應。

Celery 中的回撥函式依舊是一個任務簽名,而觸發回撥的事件或條件就是「任務執行成功」和「任務執行失敗」。
呼叫 apply_async 方法的 link/link_error 來指定回撥函式。e.g.

  • 任務執行成功回撥:
# 預設的,回撥函式的實參來自於上一個任務的執行結果
>>> from proj.task.tasks import add, log
>>> result = add.apply_async(args=(1, 2), link=log.s())
>>> result.get()
3

這裡寫圖片描述

  • 任務執行失敗回撥:
>>> result = add.apply_async(args=(1, 2), link_error=log.s())
>>> result.status
u'SUCCESS'
>>> result.get()
3

這裡寫圖片描述

如果你希望回撥函式的實參不來自於上一個任務的結果,那麼你可以將回調函式的實參設定為 immutable(不可變的):

>>> add.apply_async((2, 2), link=log.signature(args=('Task SUCCESS', ), immutable=True))
<AsyncResult: c136ad34-68b4-49a9-8462-84ac8cd75810>
# 簡易寫法
>>> add.apply_async((2, 2), link=log.si('Task SUCCESS'))
<AsyncResult: bbb35212-5a6b-427b-a6a6-d1eb5359365e>

這裡寫圖片描述

當然了,回撥函式和偏函式可以結合使用,擁有了更好的靈活性:

>>> result = add.apply_async((2, 2), link=add.s(2))

這裡寫圖片描述

NOTE:需要注意的是,回撥函式的結果不會被返回,所以使用 Result.get 只也能獲取第一個任務的結果。

>>> result = add.apply_async((2, 2), link=add.s(2))
>>> result.get()
4

Celery 工作流

group 任務組

任務組函式接收一組任務簽名列表,返回一個新的任務簽名——簽名組,呼叫簽名組會並行執行其包含的所有任務簽名,並返回所有結果的列表。常用於一次性建立多個任務。

>>> from celery import group
>>> from proj.task.tasks import add
>>> add_group_sig = group(add.s(i, i) for i in range(10))
>>> result = add_group_sig.delay()
>>> result.get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# 返回多個結果
>>> result.results
[<AsyncResult: 1716cfd0-e87c-4b3d-a79f-1112958111b1>, 
 <AsyncResult: a7a18bde-726e-49b2-88ed-aeba5d3bf5f2>, 
 <AsyncResult: b9d9c538-2fad-475a-b3d1-bd1488278ce2>, 
 <AsyncResult: 6f370fdd-ed7e-430a-a335-af4650ca15cf>, 
 <AsyncResult: a6ddbe14-5fbd-4079-9f12-35ebbc89d89b>, 
 <AsyncResult: 65dece11-9f38-4940-9fa0-7fcf09266c7a>, 
 <AsyncResult: 8205ffc0-1056-469a-a642-96676d1518e7>, 
 <AsyncResult: e77b7e2b-66d2-48b8-9ffd-4f8fa7d9f4a4>, 
 <AsyncResult: 355b7d01-72c1-4b00-8572-407e751d76c3>, 
 <AsyncResult: aa561ac3-656f-4c81-9e3c-00c64ca49181>]

(並行執行)
這裡寫圖片描述

chain 任務鏈

任務鏈函式接收若干個任務簽名,並返回一個新的任務簽名——鏈簽名。呼叫鏈簽名會並序列執行其所含有的任務簽名,每個任務簽名的執行結果都會作為第一個實參傳遞給下一個任務簽名,最後只返回一個結果。

>>> from celery import chain
>>> from proj.task.tasks import add
>>> add_chain_sig = chain(add.s(1, 2), add.s(3))
# 精簡語法
>>> add_chain_sig = (add.s(1, 2) | add.s(3))
>>> result = add_chain_sig.delay()          # ((1 + 2) + 3)
>>> result.status
u’SUCCESS’
>>> result.get()
6
# 僅返回最終結果
>>> result.results
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'AsyncResult' object has no attribute 'results'
# 結合偏函式
>>> add_chain_sig = chain(add.s(1), add.s(3))
>>> result = add_chain_sig.delay(3)        # ((3 + 1) + 3)
>>> result.get()
7

(序列執行)
這裡寫圖片描述

chord 複合任務

複合任務函式生成一個任務簽名時,會先執行一個組簽名(不支援鏈簽名),等待任務組全部完成時執行一個回撥函式。

>>> from proj.task.tasks import add, log
>>> from celery import chord, group, chain
>>> add_chord_sig = chord(group(add.s(i, i) for i in range(10)), log.s())
>>> result = add_chord_sig.delay()
>>> result.status
u'SUCCESS'
>>> result.get()
u'LOG: [0, 2, 4, 6, 8, 10, 12, 16, 14, 18]'

這裡寫圖片描述

可見任務組函式依舊是並行執行的,但任務組和回撥函式時序列執行的,所以 chord 被稱為複合任務函式。

chunks 任務塊

任務塊函式能夠讓你將需要處理的大量物件分為分成若干個任務塊,如果你有一百萬個物件,那麼你可以建立 10 個任務塊,每個任務塊處理十萬個物件。有些人可能會擔心,分塊處理會導致並行效能下降,實際上,由於避免了訊息傳遞的開銷,因此反而會大大的提高效能。

>>> add_chunks_sig = add.chunks(zip(range(100), range(100)), 10)
>>> result = add_chunks_sig.delay()
>>> result.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], 
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38], 
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58], 
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78], 
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98], 
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118], 
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138], 
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158], 
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178], 
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

這裡寫圖片描述

map/starmap 任務對映

對映函式,與 Python 函數語言程式設計中的 map 內建函式相似。都是將序列物件中的元素作為實參依次傳遞給一個特定的函式。
map 和 starmap 的區別在於,前者的引數只有一個,後者支援的引數有多個。

>>> add.starmap(zip(range(10), range(100)))
[proj.task.tasks.add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]]
>>> result = add.starmap(zip(range(10), range(100))).delay()
>>> result.get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

這裡寫圖片描述

如果使用 map 來處理 add 函式會報錯,因為 map 只能支援一個引數的傳入。

>>> add.map(zip(range(10), range(100)))
[proj.task.tasks.add(x) for x in [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]]
>>> result = add.map(zip(range(10), range(100))).delay(1)
>>> result.status
u’FAILURE'

這裡寫圖片描述