分散式任務佇列--Celery的學習筆記
一、Celery簡介
Celery是一個簡單,靈活,可靠的分散式系統,用於處理大量訊息,同時為操作提供維護此類系統所需的工具。它是一個任務佇列,專注於實時處理,同時還支援任務排程。
所謂任務佇列,是一個邏輯上的概念,可以將抽象中的任務傳送到指定的執行任務的元件,任務佇列可以跨執行緒或機器執行。
Celery是基於Python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery。
二、Celery使用場景
1.高併發的請求任務,比如需要傳送大量請求的網路爬蟲,就可以使用Celery來加速爬取。
2.非同步任務,將耗時的操作交給Celery來完成,比如傳送/接收郵件、訊息推送等等。
3.定時任務,需要定時執行的程式,比如每天定時執行爬蟲爬取資料。
三、Celery架構
下圖是我找到的一張表示Celery架構的圖:
任務生產者:產生任務並且把任務提交到任務佇列的就是任務生產者。
任務排程Beat:Celery會根據配置檔案對任務進行調配,可以按一定時間間隔週期性地執行某些任務。
中間人Broker:Celery使用訊息進行通訊,需要中間人在客戶端和Worker之間進行傳遞,接收客戶端傳送過來的任務,並將任務分配給Worker。
在Celery的文件中,可以找到官方給出的實現Broker的工具有:
名稱 | 狀態 | 監控 | 遠端控制 |
RabbitMQ | 穩定 | 是 | 是 |
Redis | 穩定 | 是 | 是 |
Amazon SQS | 穩定 | 否 | 否 |
Zookeeper | 實驗性 | 否 | 否 |
消費者Worker:Worker是執行任務的單元,在Celery任務佇列中屬於消費者。Worker會不斷地監聽佇列,一旦有任務新增進來,就會將任務取出來進行執行。Worker還可以執行在多臺機器上,只要它們都指向同一個Broker就可以。
結果儲存Backend:結果儲存Backend,顧名思義就是將Worker執行後得到的結果儲存起來。Celery中有幾個內建的結果儲存可供選擇,包括SQLAlchemy / Django ORM、Redis、RabbitMQ、Mamcached等。
四、Celery安裝
Celery4.0版本是支援Python2.7的最後一個版本,所以如果你還在用py2的話,可能要選擇安裝Celery3或者更早的版本。我本人用的Python版本是Python3.7,然後安裝的Celery版本是4.3。安裝的話使用pip安裝就好:
pip install celery
如果pip安裝出錯的話,可以去這個網址進行下載。在使用pip安裝的時候會自動安裝一些相關依賴,如果這些依賴安裝出錯的話,搜一下相應版本的Wheel檔案下載安裝即可。
中介軟體Broker我選擇使用的是Redis,這裡就不說Redis怎麼安裝了,上一篇部落格中有Ubuntu下安裝Redis的介紹。
五、Celery使用示例
1.應用
在使用Celery的時候,第一件事是要建立一個Celery例項,一般稱之為應用,簡稱為app。建立一個test.py,其中程式碼如下:
1 from celery import Celery 2 3 4 app = Celery("test", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379") 5 6 7 @app.task 8 def add(x, y): 9 return x + y
2.執行Celery伺服器
在建立好應用之後,就可以使用Celery命令執行程式執行Worker了:
celery -A test worker -l info
執行後可以看到如下圖:
有關可用命令列選項的完整列表,執行如下命令:
celery worker --help
3.呼叫任務
要呼叫任務,可以使用delay()方法。
該任務會返回一個AsyncResult例項,可用於查詢任務狀態、獲取任務返回值等。此時檢視前面執行的伺服器,會看到有如下資訊:
Received task: test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9]
Task test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9] succeeded in 0.006505205000166825s: 5
4.檢視結果
在前面定義的時候,已經選擇使用Redis作為結果後端了,所以任務執行後的結果會儲存到Redis中。而且,在呼叫任務的時候,還可以進行如下操作:
其中ready()方法會返回該任務是否已經執行,get()方法則會獲取任務返回的結果。
5.配置檔案
由於Celery的配置資訊比較多,因此一般會建立一個配置檔案來儲存這些配置資訊,通常會命名為celeryconfig.py。在test.py所在資料夾下新建配置檔案celeryconfig.py,其中的程式碼如下:
1 # broker(訊息中介軟體來接收和傳送任務訊息) 2 BROKER_URL = 'redis://127.0.0.1:6379' 3 # backend(儲存worker執行的結果) 4 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379' 5 6 # 設定時間參照,不設定預設使用的UTC時間 7 CELERY_TIMEZONE = 'Asia/Shanghai' 8 # 指定任務的序列化 9 CELERY_TASK_SERIALIZER = 'json' 10 # 指定執行結果的序列化 11 CELERY_RESULT_SERIALIZER = 'json'
然後修改下test.py中的程式碼:
1 from celery import Celery 2 3 4 app = Celery("test") 5 app.config_from_object("celerystudy.celeryconfig") 6 7 8 @app.task 9 def add(x, y): 10 return x + y