Celery非同步任務框架
一、Celery非同步任務框架
Celery是一個非同步任務框架,並且是一個簡單、靈活可靠的,處理大量訊息的分散式系統
Celery服務為其他專案服務提供非同步解決任務的需求,內建socket
Celery可執行的任務:執行非同步任務,執行延遲任務,執行定時任務
Celery 官方文件英文版:http://docs.celeryproject.org/en/latest/index.html
二、Celery架構
Celery是由三部分組成的,訊息中介軟體(message broker)、任務執行單元(worker)、任務結果儲存(task result store)組成的。
Broker(任務中介軟體)————>Worker(任務執行者)————>Backend(任務結果倉庫)
訊息中介軟體:Celery是不提供訊息服務的,但是可以使用第三方來提供訊息服務(提供任務),列如,Redis。
任務執行單元:Worker會自動(後臺非同步)執行訊息中介軟體(broker)中的任務任務。
任務結果儲存:將Worker執行的結果儲存在backend中,可以使用Redis來儲存
三、Celery任務結構
Celery有兩種任務結構,基本結構、包架構封裝,但是我們提倡使用包架構封裝,因為結構更加清晰,例如:
project ├── celery_task # celery包 │ ├── __init__.py # 包檔案 │ ├── celery.py # celery連線和配置相關檔案,且名字必須叫celery.py │ └── tasks.py # 所有任務函式 ├── add_task.py # 新增任務 └── get_result.py # 獲取結果
包架構封裝
celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' # broker 任務佇列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2' # backend 結果儲存,執行結果放在這裡面
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py
from .celery import app @app.task def add(x, y): return x + y @app.task def multip(x, y): return x * y
非同步任務執行:
add_task.py
把 tasks.py 中的任務函式新增到 broker 中
windows 首先需要安裝:pip install celery 和 pip install eventlet
需要啟動 celery, 在包專案下輸入以下命令:
C:\project> celery -A celery_task worker -P eventlet -l info
celery -A 包名 worker -P eventlet -l info
from celery_task.tasks import add
# 提交非同步任務
ret = add.delay(5, 3) # 把add函式任務新增到 broker中,worker在非同步實時取出執行
print(ret) # 0cc72e56-4604-4c00-bb3d-5b456f4869a7 獲取執行結果需要此ID
延遲任務執行:
add_task.py
還是需要先啟動celery
from celery_task.tasks import multip
# 提交延遲任務
from datetime import datetime, timedelta
# 需要UTC時間
eta = datetime.utcnow() + timedelta(seconds=10) # 當前UTC時間往後加10秒
ret = multip.apply_async(args=(9, 9), eta=eta) # 10 秒之後執行
print(ret) # 3c8cfa57-05ff-4a26-b8fa-1f7f2d8051f2 獲取執行結果需要此ID
定時任務執行:
執行定時任務需要從新配置celery.py
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab
broker = 'redis://127.0.0.1:6379/1' # broker 任務佇列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2' # backend 任務佇列,執行結果放在這裡面
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])
# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
app.conf.beat_schedule = {
# add 任務
'add-task': {
'task': 'celery_task.tasks.add',
'schedule': crontab(hour=8, day_of_week=1), # 每週一早八點執行一次
'args': (300, 150),
},
# multip 任務
'multip-task': {
'task': 'celery_task.tasks.multip',
'schedule': timedelta(seconds=3), # 每三秒執行一次
'args': (300, 150),
}
}
啟動 worker 等待執行任務
celery -A celery_task beat -l info
celery -A 包名 beat -l info
啟動 beat 將任務新增 broker 中,讓worker執行
celery -A celery_task worker -P eventlet -l info
celery -A 包名 worker -P eventlet -l info
檢視任務執行結果:
get_result.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '3fedc0d8-32c8-4b1a-af43-fedfac6107a2'
if __name__ == '__main__':
asyncs = AsyncResult(id=id, app=app)
if asyncs.successful():
result = asyncs.get()
print(result) # 成功則取出backend中id對應的值
elif asyncs.failed():
print('任務失敗')
elif asyncs.status == 'PENDING':
print('任務等待中被執行')
elif asyncs.status == 'RETRY':
print('任務異常後正在重試,或id不存在')
elif asyncs.status == 'STARTED':
print('任務已經開始被執行')
基本結構
建立py檔案:celery_app_task.py
from celery import Celery
import time
# backend='redis://:[email protected]:6379/1' # 有密碼123456
broker = 'redis://127.0.0.1:6379/1' # broker 任務佇列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2' # backend 任務佇列,執行結果放在這裡面
app = Celery(__name__, broker=broker, backend=backend)
@app.task
def add(x, y):
return x + y
啟動 worker
celery -A celery_app_task worker -P eventlet -l info
新增任務:add_task.py
from celery_app_task import add
# 提交任務
ret = add.delay(5, 3) # 往 broker 中新增一個任務
print(ret)
學習之旅