動手實現一個簡單的Celery
簡介
Celery是一個由Python實現的分散式任務佇列,任務佇列通常有3個方面的功能。
- 1.減緩高併發壓力,先將任務寫入佇列,有空餘資源再執行
- 2.執行定時任務,先將任務寫入佇列,指定時間下再執行
- 3.非同步任務,web中存在耗時任務可以先將其寫入佇列,然後後臺任務程式去執行
已經有很多文章來描述Celery的用法與簡單原理,本篇文章也會簡單提及,但不會費太多筆墨。
本篇重點在於,利用Python動手實現一個簡單的Celery,並使用自己實現的Celery實現非同步任務,與上一篇「Python Web:Flask非同步執行任務」一樣,通過Flask構建一個簡單的web,然後執行耗時任務,希望前端可以通過進度條顯示任務的進度。
需注意,這裡不會去解讀Celery的原始碼,其原始碼具有很多工程細節,比較複雜,這裡只是從其本質出發,簡單的實現一個玩具Celery,這個玩具Celery在穩定性、效率等方面當然不能與Celery相比,但可以很好的理解Celery大體是怎麼實現的。
本文講究的是「形離神合」,與Celery實現細節不同,但本質原理相同。
那我們開始吧!
Celery的概念與原理
Celery 5 個關鍵的概念,弄明白,就大致理解 Celery 了。
-
1.Task(任務) 簡單而言就是你要做的事情,如使用者註冊流程中的傳送郵件
-
2.Worker(工作者) 在後臺處理Task的人
-
3.Broker(經紀人) 本質是一種佇列,Task 會交給 Broker ,Worker 會從 Broker 中取 Task ,並處理
-
4.Beat 定時任務排程器,根據定的時間,向 Broker 中新增資料,然後等待 Worker 去處理
-
5.Backend 用於儲存 Worker 執行結果的物件,每個 Task 都要有返回值,這些返回值,就在 Backend 中
這裡我們拋開這裡的各種概念,從更本質的角度來看Celery,發現它就一個任務序列化儲存與反序列化獲取的過程。
以Web非同步任務為例,使用方式通常為:
- 1.有一個要長時間處理I/O的函式,如果不將其非同步執行就會產生的阻塞,這通常是不被允許的
- 2.啟動一個後臺任務執行程式
- 3.當要執行耗時函式時,不會立刻同步執行,而是提取函式的關鍵資料,將其序列化儲存到佇列中,佇列可以使用Redis或其他方式實現
- 4.後臺任務執行程式會從佇列中獲取資料,並將其反序列化還原
- 5.後臺任務執行程式會使用原來的函式以及還原的資料完成函式的執行,從而實現非同步執行的效果。
流程並不複雜,Celery中不同的概念分別負責上面流程中的不同部分。
實現一個簡單的Celery
接著我們來實現一個Celery,這裡Celery選擇Redis作為後端。
先來整理一個大體的框架。
首先我們需要定義一個Task類來表示要執行的任務,不同的任務要執行的具體邏輯由使用者自身編寫。
接著要定義一個任務佇列,即Celery中的Broker,用於儲存要執行的任務。
隨後要定義執行程式Worker,Worker要從Broker中獲取任務並去執行。
最後還需要定義一個用於儲存任務返回資料的類,即Celery中的Backend。
看上去有點複雜,不慌,其實很簡單。
實現任務類
首先來實現task.py,用於定義任務相關的一些邏輯
# task.py
import abc
import json
import uuid
import traceback
import pickle
from broker import Broker
from backend import Backend
class BaseTask(abc.ABC):
"""
Example Usage:
class AdderTask(BaseTask):
task_name = "AdderTask"
def run(self,a,b):
result = a + b
return result
adder = AdderTask()
adder.delay(9,34)
"""
task_name = None
def __init__(self):
if not self.task_name:
raise ValueError("task_name should be set")
self.broker = Broker()
@abc.abstractmethod # abstractmethod 派生類必須重寫實現邏輯
def run(self,*args,**kwargs):
# 寫上你具體的邏輯
raise NotImplementedError("Task `run` method must be implemented.")
# 更新任務狀態
def update_state(self,task_id,state,meta={}):
_task = {"state": state,"meta": meta}
serialized_task = json.dumps(_task)
backend = Backend()
backend.enqueue(queue_name=task_id,item=serialized_task)
print(f"task info: {task_id} succesfully queued")
# 非同步執行
def delay(self,**kwargs):
try:
self.task_id = str(uuid.uuid4())
_task = {"task_id": self.task_id,"args": args,"kwargs": kwargs}
serialized_task = json.dumps(_task)
# 加入redis中
self.broker.enqueue(queue_name=self.task_name,item=serialized_task)
print(f"task: {self.task_id} succesfully queued")
except Exception:
# traceback.print_exc()
raise Exception("Unable to publish task to the broker.")
return self.task_id
# 獲取資料
def async_result(task_id):
backend = Backend()
_dequeued_item = backend.dequeue(queue_name=task_id)
dequeued_item = json.loads(_dequeued_item)
state = dequeued_item["state"]
meta = dequeued_item["meta"]
class Info():
def __init__(self,meta):
self.state = state
self.meta = meta
info = Info(state,meta)
return info
複製程式碼
上述程式碼中,定義了BaseTask類,它繼承自python的abc.ABC成為一個抽象基類,其中一開始便要求必須定義task_name,這是因為後面我們需要通過task_name去找對應的任務佇列。
BaseTask類的run()方法被abc.abstractmethod裝飾,該裝飾器會要求BaseTask的派生類必須重寫run()方法,這是為了讓使用者可以自定義自己的任務邏輯。
BaseTask類的update_state()方法用於更新任務的狀態,其邏輯很簡單,先將引數進行JSON序列化,然後呼叫Backend的enqueue()方法將資料存入,這裡的Backend其實是Redis例項,enqueue()方法會將資料寫入Redis的list中,需要注意,這裡list的key為task_id,即當前任務的id。
BaseTask類的delay()方法用於非同步執行任務,首先通過uuid為任務建立一個唯一id,然後將方法的引數通過JSON序列化,然後呼叫Broker的enqueue()將資料存入,這裡的Broker其實也是一個Redis例項,enqueue()方法同樣是將資料寫入到Redis的list中,只是list的key為task_name,即當前任務的名稱。
此外還實現了async_result()方法,該方法用於非同步獲取任務的資料,通過該方法可以獲得任務的執行結果,或任務執行中的各種資料,資料的結構是有簡單約定的,必須要有state表示當然任務的狀態,必須要有meta表示當前任務的一些資訊。
實現Broker與Backend
在task.py中使用了Broker與Backend,那接著就來實現一下這兩個,先實現Broker。
# broker.py
import redis # pip install redis
class Broker:
"""
use redis as our broker.
This implements a basic FIFO queue using redis.
"""
def __init__(self):
host = "localhost"
port = 6379
password = None
self.redis_instance = redis.StrictRedis(
host=host,port=port,password=password,db=0,socket_timeout=8.0
)
def enqueue(self,item,queue_name):
self.redis_instance.lpush(queue_name,item)
def dequeue(self,queue_name):
dequed_item = self.redis_instance.brpop(queue_name,timeout=3)
if not dequed_item:
return None
dequed_item = dequed_item[1]
return dequed_item
複製程式碼
沒什麼可講的,就是定了兩個方法用於資料的儲存與讀取,儲存使用lpush方法,它會將資料從左邊插入到Redis的list中,讀取資料使用brpop方法,它會從list的右邊取出第一個元素,返回該元素值並從list刪除,左進右出就構成了一個佇列。
為了簡便,Backend的程式碼與Broker一模一樣,只是用來儲存任務的資訊而已,程式碼就不貼了。
後臺任務執行程式Worker
接著來實現後臺任務執行程式Worker
# worker.py
import json
class Worker:
"""
Example Usage:
task = AdderTask()
worker = Worker(task=task)
worker.start()
"""
def __init__(self,task) -> None:
self.task = task
def start(self,):
while True:
try:
_dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
dequeued_item = json.loads(_dequeued_item)
task_id = dequeued_item["task_id"]
task_args = dequeued_item["args"]
task_kwargs = dequeued_item["kwargs"]
task_kwargs['task_id'] = task_id
self.task.run(*task_args,**task_kwargs)
print("succesful run of task: {0}".format(task_id))
except Exception:
print("Unable to execute task.")
continue
複製程式碼
上述程式碼中,定義了Worker類,Worker類在初始化時需要指定具體的任務例項,然後從broker中獲取任務相關的資料,接著呼叫其中的run()方法完成任務的執行,比較簡單。
使用玩具Celery
玩具Celery的關鍵結構都定義好了,接著就來使用一下它,這裡依舊會使用「Python Web:Flask非同步執行任務」文章中的部分程式碼,如前端程式碼,這裡也不再討論其前端程式碼,沒有閱讀可以先閱讀一下,方便理解下面的內容。
首先定義出一個耗時任務
# app.py
class LongTask(BaseTask):
task_name = "LongTask"
def run(self,task_id):
"""Background task that runs a long function with progress reports."""
verb = ['Starting up','Booting','Repairing','Loading','Checking']
adjective = ['master','radiant','silent','harmonic','fast']
noun = ['solar array','particle reshaper','cosmic ray','orbiter','bit']
message = ''
total = random.randint(10,50)
for i in range(total):
if not message or random.random() < 0.25:
message = '{0} {1} {2}...'.format(random.choice(verb),random.choice(adjective),random.choice(noun))
self.update_state(task_id=task_id,state='PROGRESS',meta={'current': i,'total': total,'status': message})
time.sleep(1)
self.update_state(task_id=task_id,state='FINISH',meta={'current':100,'total': 100,'status': 'Task completed!','result':32})
return
複製程式碼
每個耗時任務都要繼承在BaseTask並且重寫其run()方法,run()方法中的邏輯就是當前這個耗時任務要處理的具體邏輯。
這裡邏輯其實很簡單,就是隨機的從幾個列表中抽取詞彙而已。
在for迭代中,想要前端知道當前任務for迭代的具體情況,就需要將相應的資料通過BaseTask的update_state()方法將其更新到backend中,使用task_id作為Redis中list的key。
當邏輯全部執行完後,將狀態為FINISH的資訊存入backend中。
寫一個介面來觸發這個耗時任務
# app.py
@app.route('/longtask',methods=['POST'])
def longtask():
long_task = LongTask()
task_id = long_task.delay()
return jsonify({}),202,{'Location': url_for('taskstatus',task_id=task_id)}
複製程式碼
邏輯非常簡單,例項化LongTask(),並呼叫其中的delay()方法,該方法會將當前任務存入認為佇列中,當前的請求會將當前任務的task_id通過響應包頭的中的taskstatus欄位傳遞給前端。
前端獲取到後,就可以通過task_id去獲取當前任務執行狀態等資訊,從而實現前端的視覺化。
接著定義相應的介面來獲取當前任務的資訊,呼叫用async_result()方法,將task_id傳入則可。
# app.py
@app.route('/status/<task_id>')
def taskstatus(task_id):
info = async_result(task_id)
print(info)
if info.state == 'PENDING':
response = {
'state': info.state,'current': 0,'total': 1,'status': 'Pending...'
}
elif info.state != 'FAILURE':
response = {
'state': info.state,'current': info.meta.get('current',0),'total': info.meta.get('total',1),'status': info.meta.get('status','')
}
if 'result' in info.meta:
response['result'] = info.meta['result']
else:
# something went wrong in the background job
response = {
'state': info.state,'current': 1,'status': str(info.meta),# this is the exception raised
}
return jsonify(response)
複製程式碼
最後,需要定義一個啟動後臺任務執行程式的邏輯
# run_worker.py
from worker import Worker
from app import LongTask
if __name__ == "__main__":
long_task = LongTask()
worker = Worker(task=long_task)
worker.start()
複製程式碼
至此,整體結構就構建完了,使用一下。
首先執行redis。
redis-server
複製程式碼
然後執行Flask。
python app.py
複製程式碼
最後啟動一下後臺任務執行程式,它相當於Celery的celery -A xxx worker --loglevel=info
命令。
python run_worker.py
複製程式碼
同時執行多個任務,效果如下
對應的一些列印如下:
python run_worker.py
Unable to execute task.
Unable to execute task.
Unable to execute task.
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
複製程式碼
python app.py
* Serving Flask app "app" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 145-285-706
127.0.0.1 - - [25/Sep/2019 11:14:07] "GET / HTTP/1.1" 200 -
task: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
127.0.0.1 - - [25/Sep/2019 11:14:11] "POST /longtask HTTP/1.1" 202 -
<task.async_result.<locals>.Info object at 0x107f50780>
127.0.0.1 - - [25/Sep/2019 11:14:11] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
<task.async_result.<locals>.Info object at 0x107f50a20>
127.0.0.1 - - [25/Sep/2019 11:14:13] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
複製程式碼
尾
需要注意一些,上面的程式碼中,使用Worker需要例項化具體的任務,此時任務例項與app.py中通過介面建立的任務例項是不同的,Worker利用不同的例項,使用相同的引數,從而實現執行效果相同的目的。
程式碼已上傳Githu:github.com/ayuLiao/toy…
如果你覺得文章有幫助,請按一下右下角的「在看」小星星,那是可以按的,謝謝。