1. 程式人生 > 其它 >解決 vuex 重新整理後資料丟失問題

解決 vuex 重新整理後資料丟失問題

Celery進階

在你的應用中使用Celery

我們的專案

proj/__init__.py
/celery.py
/tasks.py
 1 # celery.py
 2 from celery import Celery
 3  4 app = Celery('proj',
 5              broker='amqp://',  # 訊息中介(我更喜歡叫訊息樞紐)
 6              backend='rpc://',  # 後端,跟蹤任務狀態和結果
 7              include=['proj.tasks'])    # 引入指定的任務,即tasks.py
8 9 # Optional configuration, see the application user guide. 10 app.conf.update( 11 result_expires=3600, # 設定結果超時為1小時 12 ) 13 14 if __name__ == '__main__': 15 app.start() 16 # tasks.py 17 from .celery import app 18 19 20 @app.task 21 def add(x, y): 22 return x + y 23 24 25
@app.task 26 def mul(x, y): 27 return x * y 28 29 30 @app.task 31 def xsum(numbers): 32 return sum(numbers)

啟動worker

$ celery -A proj worker -l INFO
​
# 啟動成功就會看到一下展示
--------------- [email protected] v4.0 (latentcall)
--- ***** -----
-- ******* ---- [Configuration]
- *** --- * --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)            #
指的是當前可使用的程序有8個,預設是CPU的核數(程序池) - ** ---------- . events: OFF (enable -E to monitor this worker) - ** ---------- - *** --- * --- [Queues] -- ******* ---- . celery: exchange:celery(direct) binding:celery --- ***** ----- ​ [2012-06-08 16:23:51,078: WARNING/MainProcess] [email protected] has started.

Celery支援程序池、Eventlet、Gevent以及執行一個簡單的執行緒

呼叫任務

在入門的時候說了delay()是呼叫任務的方法,但它是apply_async()的快捷方式

1 add.apply_async((2,2),queue='lori',countdown=10)
2 # 第一個引數是個元組,是用來向add這個方法傳值的
3 # queue:任務將會發送到名為‘lori’的佇列中
4 # countdown:傳送訊息10秒後執行任務
5 # delay()只能傳add方法的引數,而apply_async()不僅為add傳參,還可以對訊息做相關處理

delay()和apply_async()都會返回一個AsyncResult物件,用於任務執行的狀態,但是必須另結果後臺可用,從而將資料儲存到某處(結果預設是不儲存的)

 1 res = add.delay(2,2)
 2 res.get(timeout=1)  # 獲取任務執行結果,超時為1秒
 3 res.id  # 任務的ID
 4 res.get(propagate=False)    # 遮蔽掉具體的異常展示
 5 res.failed()    # 任務執行失敗返回True,成功返回False
 6 res.successful()    # 任務執行成功返回True,失敗返回False
 7 res.state   # 返回任務的當前狀態
 8 # 啟動狀態是一種特殊的狀態,只有當task_track_started設定是啟用的,或者為任務設定了@task(track_started=True)選項時,才會記錄該狀態。
 9 # 實際上PENDING狀態不會被記錄,所以
10 from proj.celery import app
11 res = app.AyncResult('this-id-does-not-exist')  # 這樣在任務ID不存在的情況下,顯示預設的狀態
12 res.state
13 # 'PENDING'

Canvas:設計工作流

Celery提供了簽名(signature),是用來封裝任務的引數以及執行選項

與delay()和apply_async不同,簽名不會執行

 1 s1 = add.signature((2,2),countdown=10)  # 或者簡寫s1 = add.s(2,2);s()是signature()的快捷方式,像delay(),凡是快捷方式都沒有操作選項的引數
 2 res = s1.delay() # 執行最後還是要用過delay()和apply_async()
 3 
 4 # 以下兩句是等價的
 5 add.apply_async(args, kwargs, **options)
 6 add.signature(args, kwargs, **options).apply_async()
 7 
 8 # 簽名還可以進行克隆
 9 s = add.s(2)
10 >>> proj.tasks.add(2)
11  
12 s.clone(args=(4,), kwargs={'debug': True})
13 >>> proj.tasks.add(4, 2, debug=True)

部分引數拼接args

1 s1 = add.s(2)  # 
2 res = s1.deplay(8)  # 實際執行的是s1.deplay(8,2)

引數覆蓋kwargs

s2 = add.s(2,2,debug=True)
s2.delay(debuy=False)

原語

  1. Groups:組,用於並行呼叫任務,返回一個結果例項

    1 from celery import group
    2 from proj.tasks import add
    3 
    4 group(add.s(i, i) for i in range(10))().get()
    5 >>>[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    6 # 部分組,與部分簽名一樣
    7 g = group(add.s(i) for i in range(10))
    8 g(10).get()
    9 >>>[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
  2. Chains:鏈,用於序列呼叫任務,待一個任務返回結果後再呼叫下一個任務

     1 from celery import chain
     2 from proj.tasks import add, mul
     3 
     4 # (4 + 4) * 8
     5 chain(add.s(4, 4) | mul.s(8))().get() # 使用| 可以將上一個任務的結果作為引數,傳到下一個任務簽名中
     6 >>> 64
     7 # (? + 4) * 8
     8 g = chain(add.s(4) | mul.s(8))
     9 g(4).get()
    10 >>> 64
    11 (add.s(4, 4) | mul.s(8))().get()    # 簡寫可以去掉chain
    12 >>> 64
  3. Chords:弦,是一個帶有回撥的組,即序列和並行結合

    1 from celery import chord
    2 from proj.tasks import add, xsum
    3 
    4 chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()    # 將並行的結果例項傳到序列的方法中執行
    5 >>> 90
    6 (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()    # 連結到其他任務的組將自動轉換為和絃
    7 >>> 90
    8 upload_document.s(file) | group(apply_filter.s() for filter in filters)

路由

Celery支援RabbitMQ的所有路由,可將訊息傳送到指定的任務佇列

 1 # rabbitMQ路由的配置
 2 app.conf.update(
 3     task_routes = {
 4         'proj.tasks.add': {'queue': 'hipri'},
 5     },
 6 )
 7 ------------------------------
 8 # celery傳送訊息到指定佇列
 9 from proj.tasks import add
10 add.apply_async((2, 2), queue='hipri')

可以使用celery -Q指定佇列

$ celery -A proj worker -Q hipri        # 可以在執行worker的時候指定
$ celery -A proj worker -Q hipri,celery    # 用逗號分割指定多個

遠端控制

  1. 檢查

    # celery -A proj inspect --help 檢查
    celery -A proj inspect active
    celery -A proj inspect active [email protected]    # 指定worker
    
    celery -A proj status # 顯示所有worker的狀態列表
  2. 控制

    # celery -A proj control --help 控制
    celery -A proj control enable_events    # 遠端啟用事件
    celery -A proj events --dump    # 啟動事件後,可以啟動事件轉儲程式,並行檢視woker執行狀況
    celery -A proj control disable_events    # 遠端禁用事件