Celery框架實現非同步執行任務
Celery
官方
Celery 官網:http://www.celeryproject.org/
Celery 官方文件英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文件中文版:http://docs.jinkan.org/docs/celery/
Celery架構
Celery的架構由三部分組成,訊息中介軟體(message broker)、任務執行單元(worker)和 任務執行結果儲存(task result store)組成。
訊息中介軟體
Celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,RabbitMQ, Redis等等
任務執行單元
Worker是Celery提供的任務執行的單元,worker併發的執行在分散式的系統節點中。
任務結果儲存
Task result store用來儲存Worker執行的任務的結果,Celery支援以不同方式儲存任務的結果,包括AMQP, redis等
使用場景
非同步任務:將耗時操作任務提交給Celery去非同步執行,比如傳送簡訊/郵件、訊息推送、音視訊處理等等
定時任務:定時執行某件事情,比如每天資料統計
Celery的安裝配置
pip install celery
訊息中介軟體:RabbitMQ/Redis
app=Celery('任務名', broker='xxx', backend='xxx')
Celery執行非同步任務
包架構封裝
project
├── celery_task # celery包
│ ├── __init__.py # 包檔案
│ ├── celery.py # celery連線和配置相關檔案,且名字必須是celery.py
│ └── tasks.py # 所有任務函式
├── add_task.py # 新增任務
└── get_result.py # 獲取結果
基本使用(新增立即執行任務)
執行流程:
1)建立app + 任務
2)啟動celery(app)服務:
命令:celery worker -A celery_task -l info
windows:
pip3 install eventlet
celery worker -A celery_task -l info -P eventlet
3)新增任務:手動新增,要自定義新增任務的指令碼,右鍵執行指令碼
4)獲取結果:手動獲取,要自定義獲取任務的指令碼,右鍵執行指令碼
celery.py
from celery import Celery
# broker: 任務倉庫
broker = 'redis://127.0.0.1:6379/5'
# backend: 任務結果倉庫
backend = 'redis://127.0.0.1:6379/6'
# include: 任務(函式)所在檔案
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py(任務檔案)
from .celery import app
import time
@app.task
def add(n, m):
print(n)
print(m)
time.sleep(10)
print('n+m的結果:%s' % (n + m))
return n + m
@app.task
def low(n, m):
print(n)
print(m)
print('n-m的結果:%s' % (n - m))
return n - m
add_task.py(新增要執行的任務)
# 右鍵執行該檔案,下面的匯入環境是合理的
from celery_task.tasks import add, low
# 往celery的Broker中新增立即任務
# 先啟動celery: celery worker -A celery_task -l info -P eventlet ,然後右鍵執行執行
t1 = add.delay(10, 20)
t2 = low.delay(50, 10)
print(t2.id)
get_result.py(檢視任務結果)
from celery_task.celery import app
from celery.result import AsyncResult
# 任務執行的id,可從上方任務執行完獲取
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
# 拿到任務執行完的結果
result = async.get()
print(result)
elif async.failed():
print('任務失敗')
elif async.status == 'PENDING':
print('任務等待中被執行')
elif async.status == 'RETRY':
print('任務異常後正在重試')
elif async.status == 'STARTED':
print('任務已經開始被執行')
高階使用(執行延遲任務)
celery.py
from celery import Celery
# broker:任務倉庫
broker = 'redis://127.0.0.1:6379/15'
# backend:任務結果倉庫
backend = 'redis://127.0.0.1:6379/15'
# include:任務(函式)所在檔案
app = Celery(broker=broker, backend=backend, include=['celery_package.tasks'])
tasks.py
from .celery import app
@app.task
def jump(n1, n2):
res = n1 * n2
print('n1 * n2 = %s' % res)
return res
add_task.py(新增延遲任務)
注:
args是jump任務需要的引數,沒有就設定為空()
eta是該任務執行的UTC格式的時間
from celery_package.tasks import jump
# # 直接執行函式
# jump(10, 20)
# 新增celery立即任務
# jump.delay(10, 20)
from datetime import datetime, timedelta
# 以秒為單位新增延遲時間
def eta_second(second):
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=second)
return utc_ctime + time_delay
# 以天為單位新增延遲時間
def eta_days(days):
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(days=days)
return utc_ctime + time_delay
# apply_async就是新增延遲任務
jump.apply_async(args=(200, 50), eta=eta_second(10))
高階使用(自動任務)
執行流程:
1)建立app + 任務
2)啟動celery(app)服務:
非windows
命令:celery worker -A celery_task -l info
windows:
pip3 install eventlet
celery worker -A celery_task -l info -P eventlet
3)新增任務:自動新增任務,所以要啟動一個新增任務的服務
命令:celery beat -A celery_task -l info
4)獲取結果:手動獲取,要自定義獲取任務的指令碼,右鍵執行指令碼
celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/15'
backend = 'redis://127.0.0.1:6379/15'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 自動任務的定時配置
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
# 定時任務:任務名自定義
'fall_task': {
'task': 'celery_task.tasks.fall', # 任務源
'args': (30, 10), # 任務引數
'schedule': timedelta(seconds=3), # 定時新增任務的時間
# 'schedule': crontab(hour=8, day_of_week=1), # 每週一早八點
}
}
tasks.py
from .celery import app
@app.task
def fall(n1, n2):
res = n1 / n2
print('n1 / n2 = %s' % res)
return res
get_result.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任務失敗')
elif async.status == 'PENDING':
print('任務等待中被執行')
elif async.status == 'RETRY':
print('任務異常後正在重試')
elif async.status == 'STARTED':
print('任務已經開始被執行')
django中使用
注意點:
新增自動任務時,需要另外啟動一個新增任務的服務,就是再起一個服務端執行下面的命令。
命令:celery beat -A celery_task -l info
celery.py
# 載入django環境
import os, django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
django.setup()
from celery import Celery
# 任務倉庫
broker = 'redis://127.0.0.1:6379/15'
# 任務結果倉庫
backend = 'redis://127.0.0.1:6379/15'
# include任務函式檔案的位置
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 自動任務的定時配置
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
# 定時任務:任務名自定義
'update_banner_cache': {
'task': 'celery_task.tasks.update_banner_cache', # 任務源
'args': (), # 任務引數
'schedule': timedelta(seconds=10), # 定時新增任務的時間
# 'schedule': crontab(hour=8, day_of_week=1), # 每週一早八點
}
}
tasks.py
from .celery import app
# 獲取專案中的模型類
from api.models import Banner
@app.task
def test_django_celery():
banner_query = Banner.objects.filter(is_delete=False).all()
print(banner_query)