非同步任務神器 Celery 簡明筆記
在程式的執行過程中,我們經常會碰到一些耗時耗資源的操作,為了避免它們阻塞主程式的執行,我們經常會採用多執行緒或非同步任務。比如,在 Web 開發中,對新使用者的註冊,我們通常會給他發一封啟用郵件,而發郵件是個 IO 阻塞式任務,如果直接把它放到應用當中,就需要等郵件發出去之後才能進行下一步操作,此時使用者只能等待再等待。更好的方式是在業務邏輯中觸發一個發郵件的非同步任務,而主程式可以繼續往下執行。
Celery 是一個強大的分散式任務佇列,它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務(async task)和定時任務(crontab)。它的架構組成如下圖:
可以看到,Celery 主要包含以下幾個模組:
- 任務模組 Task包含非同步任務和定時任務。其中,非同步任務通常在業務邏輯中被觸發併發往任務佇列,而定時任務由 Celery Beat 程序週期性地將任務發往任務佇列。
- 訊息中介軟體 BrokerBroker,即為任務排程佇列,接收任務生產者發來的訊息(即任務),將任務存入佇列。Celery 本身不提供佇列服務,官方推薦使用 RabbitMQ 和 Redis 等。
- 任務執行單元 WorkerWorker 是執行任務的處理單元,它實時監控訊息佇列,獲取佇列中排程的任務,並執行它。
- 任務結果儲存 BackendBackend 用於儲存任務的執行結果
非同步任務
使用 Celery 實現非同步任務主要包含三個步驟:
- 建立一個 Celery 例項
- 啟動 Celery Worker
- 應用程式呼叫非同步任務
快速入門
為了簡單起見,對於 Broker 和 Backend,這裡都使用 redis。在執行下面的例子之前,請確保 redis 已正確安裝,並開啟 redis 服務,當然,celery 也是要安裝的。可以使用下面的命令來安裝 celery 及相關依賴:
Python1 | $pip install'celery[redis]' |
建立 Celery 例項
將下面的程式碼儲存為檔案 tasks.py
:
12345678910 | # -*- coding: utf-8 -*-importtimefromcelery importCelerybroker='redis://127.0.0.1:6379'backend='redis://127.0.0.1:6379/0'app=Celery('my_task',broker=broker,backend=backend)@app.taskdefadd(x,y):time.sleep(5)# 模擬耗時操作returnx+y |
上面的程式碼做了幾件事:
- 建立了一個 Celery 例項 app,名稱為
my_task
; - 指定訊息中介軟體用 redis,URL 為
redis://127.0.0.1:6379
; - 指定儲存用 redis,URL 為
redis://127.0.0.1:6379/0
; - 建立了一個 Celery 任務
add
,當函式被@app.task
裝飾後,就成為可被 Celery 排程的任務;
啟動 Celery Worker
在當前目錄,使用如下方式啟動 Celery Worker:
Python1 | $celery worker-Atasks--loglevel=info |
其中:
- 引數
-A
指定了 Celery 例項的位置,本例是在tasks.py
中,Celery 會自動在該檔案中尋找 Celery 物件例項,當然,我們也可以自己指定,在本例,使用-A tasks.app
; - 引數
--loglevel
指定了日誌級別,預設為 warning,也可以使用-l info
來表示;
在生產環境中,我們通常會使用 Supervisor 來控制 Celery Worker 程序。
啟動成功後,控制檯會顯示如下輸出:
呼叫任務
現在,我們可以在應用程式中使用 delay()
或 apply_async()
方法來呼叫任務。
在當前目錄開啟 Python 控制檯,輸入以下程式碼:
Python123 | >>>fromtasks importadd>>>add.delay(2,8)<AsyncResult:2272ddce-8be5-493f-b5ff-35a0d9fe600f> |
在上面,我們從 tasks.py
檔案中匯入了 add
任務物件,然後使用 delay()
方法將任務傳送到訊息中介軟體(Broker),Celery Worker 程序監控到該任務後,就會進行執行。我們將視窗切換到 Worker 的啟動視窗,會看到多了兩條日誌:
12 | [2016-12-1012:00:50,376:INFO/MainProcess]Received task:tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f][2016-12-1012:00:55,385:INFO/PoolWorker-4]Task tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f]succeeded in5.00642602402s:10 |
這說明任務已經被排程並執行成功。
另外,我們如果想獲取執行後的結果,可以這樣做:
Python123456789 | >>>result=add.delay(2,6)>>>result.ready()# 使用 ready() 判斷任務是否執行完畢False>>>result.ready()False>>>result.ready()True>>>result.get()# 使用 get() 獲取任務結果8 |
在上面,我們是在 Python 的環境中呼叫任務。事實上,我們通常在應用程式中呼叫任務。比如,將下面的程式碼儲存為 client.py
:
12345 | # -*- coding: utf-8 -*-fromtasks importadd# 非同步任務add.delay(2,8)print'hello world' |
執行命令 $ python client.py
,可以看到,雖然任務函式 add
需要等待 5 秒才返回執行結果,但由於它是一個非同步任務,不會阻塞當前的主程式,因此主程式會往下執行 print
語句,打印出結果。
使用配置
在上面的例子中,我們直接把 Broker 和 Backend 的配置寫在了程式當中,更好的做法是將配置項統一寫入到一個配置檔案中,通常我們將該檔案命名為 celeryconfig.py
。Celery 的配置比較多,可以在官方文件查詢每個配置項的含義。
下面,我們再看一個例子。專案結構如下:
Python1234567 | celery_demo# 專案根目錄├──celery_app# 存放 celery 相關檔案│├──__init__.py│├──celeryconfig.py# 配置檔案│├──task1.py# 任務檔案 1│└──task2.py# 任務檔案 2└──client.py# 應用程式 |
__init__.py
程式碼如下:
1234 | # -*- coding: utf-8 -*-fromcelery importCeleryapp=Celery('demo')# 建立 Celery 例項app.config_from_object('celery_app.celeryconfig')# 通過 Celery 例項載入配置模組 |
celeryconfig.py
程式碼如下:
12345678 | BROKER_URL='redis://127.0.0.1:6379'# 指定 BrokerCELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0'# 指定 BackendCELERY_TIMEZONE='Asia/Shanghai'# 指定時區,預設是 UTC# CELERY_TIMEZONE='UTC' CELERY_IMPORTS=(# 指定匯入的任務模組'celery_app.task1','celery_app.task2') |
task1.py
程式碼如下:
123456 | importtimefromcelery_app importapp@app.taskdefadd(x,y):time.sleep(2)returnx+y |
task2.py
程式碼如下:
123456 | importtimefromcelery_app importapp@app.taskdefmultiply(x,y):time.sleep(2)returnx*y |
client.py
程式碼如下:
123456 | # -*- coding: utf-8 -*-fromcelery_app importtask1fromcelery_app importtask2task1.add.apply_async(args=[2,8])# 也可用 task1.add.delay(2, 8)task2.multiply.apply_async(args=[3,7])# 也可用 task2.multiply.delay(3, 7)print'hello world' |
現在,讓我們啟動 Celery Worker 程序,在專案的根目錄下執行下面命令:
Python1 | celery_demo$celery-Acelery_app worker--loglevel=info |
接著,執行 $ python client.py
,它會發送兩個非同步任務到 Broker,在 Worker 的視窗我們可以看到如下輸出:
1234 | [2016-12-1013:51:58,939:INFO/MainProcess]Received task:celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa][2016-12-1013:51:58,941:INFO/MainProcess]Received task:celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a][2016-12-1013:52:00,948:INFO/PoolWorker-3]Task celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa]succeeded in2.00600231002s:10[2016-12-1013:52:00,949:INFO/PoolWorker-4]Task celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a]succeeded in2.00601326401s:21 |
delay 和 apply_async
在前面的例子中,我們使用 delay()
或 apply_async()
方法來呼叫任務。事實上,delay
方法封裝了 apply_async
,如下:
123 | defdelay(self,*partial_args,**partial_kwargs):"""Shortcut to :meth:`apply_async` using star arguments."""returnself.apply_async(partial_args,partial_kwargs) |
也就是說,delay
是使用 apply_async
的快捷方式。apply_async
支援更多的引數,它的一般形式如下: