Celery的任務分發與定時任務
目錄
celery應用場景
-
celery,處理任務的Python的模組。
-
場景1:
對【耗時的任務】,通過celery,將任務新增到broker(佇列),然後立即給使用者返回一個任務ID。 當任務新增到broker之後,由worker去broker獲取任務並處理任務。 任務完完成之後,再將結果放到backend中 使用者想要檢查結果,提供任務ID,我們就可以去backend中去幫他查詢。
-
場景2:
定時任務(定時釋出/定時拍賣)
-
celery的使用
Celery是由Python開發的一個簡單、靈活、可靠的處理大量任務的分發系統,它不僅支援實時處理,也支援任務排程。
支援多個broker和worker來實現高可用和分散式。
將一些耗時的任務 扔到broker佇列中,並且會返回一個任務ID,可以通過任務ID去backend佇列中獲取結果。 worker從broker獲取任務去執行,並將結果返回到backend佇列中。
函式名、引數 傳入broker
1.1 環境的搭建
pip3 install celery==4.4 安裝broker: redis或rabbitMQ pip3 install redis / pika
1.2 快速使用
-
s1.py
from celery import Celery app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379') @app.task # 說明這個函式可以被當作celery的任務 def x1(x, y): return x + y @app.task def x2(x, y): return x - y
-
s2.py
from s1 import x1 # 立即告訴celery去建立並執行x1任務,並傳兩個引數 result = x1.delay(4, 4) print(result.id) # 任務ID
-
s3.py 獲取任務結果
from celery.result import AsyncResult from s1 import app result_object = AsyncResult(id="任務ID", app=app) print(result_object.status) # 任務狀態 data = result_object.get() # 獲取任務結果 print(data)
任務超時限制
避免某些任務一直處於非正常的進行中狀態,阻塞佇列中的其他任務。應該為任務執行設定超時時間。如果任務超時未完成,則會將 Worker 殺死,並啟動新的 Worker 來替代。
@app.task(time_limit=1800) # 可以設定任務超時限制
執行程式:
-
啟動redis
-
啟動worker
# 首先要進入當前目錄 celery worker -A s1 -l info # -A s1 找到專案 # -l info 是列印日誌log,程式碼上線時不加info
windows下會報一個錯: Traceback (most recent call last): File "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\billiard\pool.py", line 362, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\celery\app\trace.py", line 546, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0) 解決安裝: pip install eventlet celery worker -A s1 -l info -P eventlet
-
建立任務,放入broker
python s2.py python s2.py
-
檢視任務狀態
# 在s3.py填寫任務ID ptyhon s3.py
取消任務
from s1 import app from celery.app.control import Control celery_control = Control(app=app) celery_control.revoke(id, terminate=True)
1.3 django中應用celery
在Django中用Django-celery。
# pip3 install django-celery (沒有用到,還是用的celery模組)
之後,需要按照django-celery的要求進行編寫程式碼。
-
第一步:【專案/專案/settings.py 】新增配置
CELERY_BROKER_URL = 'redis://192.168.16.85:6379' CELERY_ACCEPT_CONTENT = ['json'] CELERY_RESULT_BACKEND = 'redis://192.168.16.85:6379' CELERY_TASK_SERIALIZER = 'json' # CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間
Celery 配置引數彙總
配置項 說明 CELERY_DEFAULT_QUEUE 預設佇列 CELERY_BROKER_URL Broker 地址 CELERY_RESULT_BACKEND 結果儲存地址 CELERY_TASK_SERIALIZER 任務序列化方式 CELERY_RESULT_SERIALIZER 任務執行結果序列化方式 CELERY_TASK_RESULT_EXPIRES 任務過期時間 CELERY_ACCEPT_CONTENT 指定任務接受的內容型別(序列化) -
第二步:【專案/專案/celery.py】在專案同名目錄建立 celery.py
#!/usr/bin/env python # -*- coding:utf-8 -*- import os from celery import Celery # set the default Django settings module for the 'celery' program. # celery指定的配置檔案 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demos.settings') app = Celery('demos') # 名字隨便起,省略了broker配置,配置檔案已配置 # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. # 所有的celery配置都以CELERY開頭 app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. # 去每個已註冊app中讀取 tasks.py 檔案 app.autodiscover_tasks()
-
第三步,【專案/app名稱/tasks.py】
from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y
-
第四步,【專案/專案/
__init__.py
】from .celery import app as celery_app __all__ = ('celery_app',)
-
啟動worker
# 首先進入專案目錄 celery worker -A demos -l info -P eventlet
-
編寫檢視函式,呼叫celery去建立任務。
-
url
# from api.views import task url(r'^create/task/$', task.create_task), url(r'^get/result/$', task.get_result),
-
檢視函式
from django.shortcuts import HttpResponse from api.tasks import x1 from celery.result import AsyncResult # from demos.celery import app from demos import celery_app def create_task(request): print('請求來了') result = x1.delay(2,2) # 新增x1任務,並返回任務ID print('執行完畢') return HttpResponse(result.id) def get_result(request): nid = request.GET.get('nid') result_object = AsyncResult(id=nid, app=celery_app) # print(result_object.status) data = result_object.get() # 獲取資料 return HttpResponse(data)
-
-
啟動django程式
python manage.py ....
1.4 celery定時執行
from app01 import tasks
from celery.result import AsyncResult
def time_task(request):
"""
定時執行
:param request:
:return:
"""
# 獲取本地時間
ctime = datetime.datetime.now()
# 轉換成utc時間
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
s10 = datetime.timedelta(seconds=60) # 60s後執行
ctime_x = utc_ctime + s10 # 執行的時間
# 使用apply_async並設定時間
result = tasks.mul.apply_async(args=[2, 5], eta=ctime_x)
return HttpResponse(result.id)
def time_result(request):
nid = request.GET.get('nid')
from celery.result import AsyncResult
# from demos.celery import app
from celerytest import celery_app
result_object = AsyncResult(id=nid, app=celery_app)
# print(result_object.status) # 獲取狀態
# data = result_object.get() # 獲取資料
# result_object.forget() # 把資料在backend中移除
# result_object.revoke(terminate=True) # 取消任務terminate=True強制取消
# 通過狀態絕對返回方式
if result_object.successful():
# 成功
data = result_object.get()
result_object.forget()
elif result_object.failed():
# 失敗
data = '執行失敗!'
else:
data = '執行中!'
return HttpResponse(data)
支援的引數 :
-
countdown : 等待一段時間再執行.
tasks.add.apply_async((2,3), countdown=5)
-
eta : 定義任務的開始時間.
tasks.add.apply_async((2,3), eta=now+tiedelta(second=10))
-
expires : 設定超時時間.
tasks.add.apply_async((2,3), expires=60)
-
retry : 定時如果任務失敗後, 是否重試.
tasks.add.apply_async((2,3), retry=False)
-
retry_policy : 重試策略.
- max_retries : 最大重試次數, 預設為 3 次.
- interval_start : 重試等待的時間間隔秒數, 預設為 0 , 表示直接重試不等待.
- interval_step : 每次重試讓重試間隔增加的秒數, 可以是數字或浮點數, 預設為 0.2
- interval_max : 重試間隔最大的秒數, 即 通過 interval_step 增大到多少秒之後, 就不在增加了, 可以是數字或者浮點數, 預設為 0.2 .
1.5 週期性定時任務
- celery
- django中也可以結合使用
task與shared_task裝飾器的區別:
task是通過建立的Celery物件進行呼叫
例如:
app1 = Celery('tasks', broker='redis://192.168.16.48:6379',)
app2 = Celery('tasks', broker='redis://192.168.16.48:6379',)
@app1.task
def x1(x, y):
return x - y
@app2.task
def x2(x, y):
return x * y
多用於單一檔案中,不用直接載入到記憶體。當有多個Celery物件時,任務函式可以明確使用某一個來裝飾。
shared_task多用與多個檔案使用celery,一般在celery.py中只建立一個Celery物件,例如Django整合celery。在專案啟動時,會將celery物件載入到記憶體,而@shared_task會自動將寫在各個應用下task.py的任務交與記憶體中的Celery物件,複用性較強。
例如:
@shared_task
def x1(x, y):
return x - y
from web import tasks
tasks.x1.delay(1,5)
但當在celery.py中建立多個Celery物件時,不同任務使用不同的物件,這個時候就需要指明物件名。
例如:
from web import tasks
app1.tasks.x1.delay(1,5)
app2.tasks.x2.delay(1,5)
1.6 任務繫結,記錄日誌,重試
# 修改 tasks.py 檔案.
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True)
def div(self, x, y):
logger.info(('Executing task id {0.id}, args: {0.args!r}'
'kwargs: {0.kwargs!r}').format(self.request))
try:
result = x/y
except ZeroDivisionError as e:
raise self.retry(exc=e, countdown=5, max_retries=3) # 發生 ZeroDivisionError 錯誤時, 每 5s 重試一次, 最多重試 3 次.
return result
當使用 bind=True
引數之後, 函式的引數發生變化, 多出了引數 self, 這這相當於把 div 程式設計了一個已繫結的方法, 通過 self 可以獲得任務的上下文.
1.7 啟用任務監控
Flower 是 Celery 官方推薦的實時監控工具,用於監控 Tasks 和 Workers 的執行狀態。Flower 提供了下列功能:
- 檢視 Task 清單、歷史記錄、引數、開始時間、執行狀態等
- 撤銷、終止任務
- 檢視 Worker 清單、狀態
- 遠端開啟、關閉、重啟 Worker 程序
- 提供 HTTP API,方便整合到運維繫統
相比檢視日誌,Flower 的 Web 介面會顯得更加友好。
Flower 的 supervisor 管理配置檔案:
[program:flower]
command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555
directory=/opt/PyProjects/app
autostart=true
autorestart=true
startretries=3
user=derby
stdout_logfile=/var/logs/%(program_name)s.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=30
stderr_logfile=/var/logs/%(program_name)s-error.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=3
celery面試總結
1. Celery是一個由python開發的一個簡單、靈活、可靠的,能夠處理大量任務的系統,可以做任務的分發,也能夠做定時任務。多用於耗時的操作。例如傳送簡訊、郵箱這些功能就能使用Celery做任務分發。
2. @shared_task/@task裝飾的函式說明是這一個celery任務,會新增broker中。
3. 函式名.delay(引數),會去呼叫且執行任務,並且返回任務ID。
4. 可以根據任務ID,去backend獲取任務狀態、結果。AsyncResult(id=任務ID, app=celery_app).get()獲取任務的結果;
5. apply_async(args=[引數],eta)設定定時執行任務,eta是定時任務的執行時間(utc時間)。
6. revoke()可以取消任務。