解決 vuex 重新整理後資料丟失問題
阿新 • • 發佈:2021-08-24
在你的應用中使用Celery
我們的專案
proj/__init__.py
/celery.py
/tasks.py
1 # celery.py 2 from celery import Celery 3 4 app = Celery('proj', 5 broker='amqp://', # 訊息中介(我更喜歡叫訊息樞紐) 6 backend='rpc://', # 後端,跟蹤任務狀態和結果 7 include=['proj.tasks']) # 引入指定的任務,即tasks.py8 9 # Optional configuration, see the application user guide. 10 app.conf.update( 11 result_expires=3600, # 設定結果超時為1小時 12 ) 13 14 if __name__ == '__main__': 15 app.start() 16 # tasks.py 17 from .celery import app 18 19 20 @app.task 21 def add(x, y): 22 return x + y 23 24 25@app.task 26 def mul(x, y): 27 return x * y 28 29 30 @app.task 31 def xsum(numbers): 32 return sum(numbers)
啟動worker
$ celery -A proj worker -l INFO # 啟動成功就會看到一下展示 --------------- [email protected] v4.0 (latentcall) --- ***** ----- -- ******* ---- [Configuration] - *** --- * --- . broker: amqp://guest@localhost:5672// - ** ---------- . app: __main__:0x1012d8590 - ** ---------- . concurrency: 8 (processes) #指的是當前可使用的程序有8個,預設是CPU的核數(程序池) - ** ---------- . events: OFF (enable -E to monitor this worker) - ** ---------- - *** --- * --- [Queues] -- ******* ---- . celery: exchange:celery(direct) binding:celery --- ***** ----- [2012-06-08 16:23:51,078: WARNING/MainProcess] [email protected] has started.
Celery支援程序池、Eventlet、Gevent以及執行一個簡單的執行緒
在入門的時候說了delay()是呼叫任務的方法,但它是apply_async()的快捷方式
1 add.apply_async((2,2),queue='lori',countdown=10) 2 # 第一個引數是個元組,是用來向add這個方法傳值的 3 # queue:任務將會發送到名為‘lori’的佇列中 4 # countdown:傳送訊息10秒後執行任務 5 # delay()只能傳add方法的引數,而apply_async()不僅為add傳參,還可以對訊息做相關處理
delay()和apply_async()都會返回一個AsyncResult物件,用於任務執行的狀態,但是必須另結果後臺可用,從而將資料儲存到某處(結果預設是不儲存的)
1 res = add.delay(2,2) 2 res.get(timeout=1) # 獲取任務執行結果,超時為1秒 3 res.id # 任務的ID 4 res.get(propagate=False) # 遮蔽掉具體的異常展示 5 res.failed() # 任務執行失敗返回True,成功返回False 6 res.successful() # 任務執行成功返回True,失敗返回False 7 res.state # 返回任務的當前狀態 8 # 啟動狀態是一種特殊的狀態,只有當task_track_started設定是啟用的,或者為任務設定了@task(track_started=True)選項時,才會記錄該狀態。 9 # 實際上PENDING狀態不會被記錄,所以 10 from proj.celery import app 11 res = app.AyncResult('this-id-does-not-exist') # 這樣在任務ID不存在的情況下,顯示預設的狀態 12 res.state 13 # 'PENDING'
Canvas:設計工作流
Celery提供了簽名(signature),是用來封裝任務的引數以及執行選項
與delay()和apply_async不同,簽名不會執行
1 s1 = add.signature((2,2),countdown=10) # 或者簡寫s1 = add.s(2,2);s()是signature()的快捷方式,像delay(),凡是快捷方式都沒有操作選項的引數 2 res = s1.delay() # 執行最後還是要用過delay()和apply_async() 3 4 # 以下兩句是等價的 5 add.apply_async(args, kwargs, **options) 6 add.signature(args, kwargs, **options).apply_async() 7 8 # 簽名還可以進行克隆 9 s = add.s(2) 10 >>> proj.tasks.add(2) 11 12 s.clone(args=(4,), kwargs={'debug': True}) 13 >>> proj.tasks.add(4, 2, debug=True)
部分引數拼接args
1 s1 = add.s(2) # 2 res = s1.deplay(8) # 實際執行的是s1.deplay(8,2)
引數覆蓋kwargs
s2 = add.s(2,2,debug=True)
s2.delay(debuy=False)
原語
-
Groups:組,用於並行呼叫任務,返回一個結果例項
1 from celery import group 2 from proj.tasks import add 3 4 group(add.s(i, i) for i in range(10))().get() 5 >>>[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 6 # 部分組,與部分簽名一樣 7 g = group(add.s(i) for i in range(10)) 8 g(10).get() 9 >>>[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
-
Chains:鏈,用於序列呼叫任務,待一個任務返回結果後再呼叫下一個任務
1 from celery import chain 2 from proj.tasks import add, mul 3 4 # (4 + 4) * 8 5 chain(add.s(4, 4) | mul.s(8))().get() # 使用| 可以將上一個任務的結果作為引數,傳到下一個任務簽名中 6 >>> 64 7 # (? + 4) * 8 8 g = chain(add.s(4) | mul.s(8)) 9 g(4).get() 10 >>> 64 11 (add.s(4, 4) | mul.s(8))().get() # 簡寫可以去掉chain 12 >>> 64
-
Chords:弦,是一個帶有回撥的組,即序列和並行結合
1 from celery import chord 2 from proj.tasks import add, xsum 3 4 chord((add.s(i, i) for i in xrange(10)), xsum.s())().get() # 將並行的結果例項傳到序列的方法中執行 5 >>> 90 6 (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get() # 連結到其他任務的組將自動轉換為和絃 7 >>> 90 8 upload_document.s(file) | group(apply_filter.s() for filter in filters)
路由
Celery支援RabbitMQ的所有路由,可將訊息傳送到指定的任務佇列
1 # rabbitMQ路由的配置 2 app.conf.update( 3 task_routes = { 4 'proj.tasks.add': {'queue': 'hipri'}, 5 }, 6 ) 7 ------------------------------ 8 # celery傳送訊息到指定佇列 9 from proj.tasks import add 10 add.apply_async((2, 2), queue='hipri')
可以使用celery -Q指定佇列
$ celery -A proj worker -Q hipri # 可以在執行worker的時候指定 $ celery -A proj worker -Q hipri,celery # 用逗號分割指定多個
遠端控制
-
檢查
# celery -A proj inspect --help 檢查 celery -A proj inspect active celery -A proj inspect active [email protected] # 指定worker celery -A proj status # 顯示所有worker的狀態列表
-
控制
# celery -A proj control --help 控制 celery -A proj control enable_events # 遠端啟用事件 celery -A proj events --dump # 啟動事件後,可以啟動事件轉儲程式,並行檢視woker執行狀況 celery -A proj control disable_events # 遠端禁用事件