celery+django+redis使用介紹
阿新 • • 發佈:2018-10-19
revoke sync bind __init__ ons bubuko redis class ucc
版本:
- celery==3.1.25
- django==1.8.6
- redis==2.10.6
安裝:
- 進入虛擬環境(虛擬環境創建不同,進入方式不同)
- pip install celery==3.1.25(以celery安裝為例,其他安裝方式相同)
運行環境:
- window10(celery4.0以後不支持windows)
- linux
目錄結構:
- 最外面的test1為項目名稱
- 裏面的test1與app同級目錄,裏面都是一些配置(其中celery.py放在其中)。
- app2為項目中的一個app
- tasks.py在app2中,這個tasks.py只針對app2使用。若其他app也有celery任務。建立同樣的文件名即可。
運行celery,需要的幾個文件:
- test1下的celery.py,代碼如下(裏面有介紹):
1 from __future__ import absolute_import, unicode_literals 2 import os 3 from django.conf import settings 4 from celery import Celery 5 6 7 #設置 Django 的配置文件 8 os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘test.settings‘) 9 10 # 創建 celery 實例View Code11 app = Celery(‘test1‘) 12 13 # Using a string here means the worker will not have to 14 # pickle the object when using Windows. 15 app.config_from_object(‘django.conf:settings‘) 16 17 # 搜索所有 app 中的 tasks 18 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 19 20 @app.task(bind=True) 21 defdebug_task(self): 22 print(‘Request: {0!r}‘.format(self.request))
- test1下的settings.py,代碼如下:
1 """原來的django的settings內容""" 2 3 #celery config 4 #消息中間件(使用redis),消息代理,用於發布者傳遞消息給消費者 5 BROKER_URL = ‘redis://127.0.0.1:6379‘ 6 #消息結果返回中間件(使用redis),用於存儲任務執行結果 7 CELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6379‘ 8 #允許的內容類型, 9 CELERY_ACCEPT_CONTENT = [‘json‘] 10 #任務的序列化方式 11 CELERY_TASK_SERIALIZER = ‘json‘ 12 #任務結果的序列化方式 13 CELERY_RESULT_SERIALIZER = ‘json‘ 14 #celery時區,定時任務使用 15 CELERY_TIMEZONE = ‘Asia/Shanghai‘ 16 from datetime import timedelta 17 #定時任務處理,使用的schedule,裏面的task,寫上任務的“路徑”,schedule設置時間,args設置參數。 18 CELERYBEAT_SCHEDULE = { 19 ‘add_every_10_seconds‘: { 20 ‘task‘: ‘app2.tasks.add‘, 21 ‘schedule‘: timedelta(seconds=10), 22 ‘args‘: (4,4) 23 }, 24 }View Code
- test1下的__init__.py,代碼如下:
1 from __future__ import absolute_import 2 from .celery import app as celery_app 3 # 這是為了確保在django啟動時啟動 celeryView Code
- app2下的tasks.py,這個tasks文件只針對該app2使用,其他app可以根據實際情況添加tasks.py文件,裏面寫相應的任務代碼。代碼如下:
1 # -*- coding:utf-8 -*- 2 from __future__ import absolute_import 3 from auto_model_platform.celery import app 4 5 @app.task 6 def add(x, y): 7 time.sleep(30) 8 print("running...", x, y) 9 return x + yView Code
調用tasks.py內的函數:
- 在app2.views.py中的某個視圖函數中直接調用,比如調用tasks.py中的add(x,y)函數,這樣調用。代碼如下:
1 from app2.tasks import add 2 3 class TestView(View): 4 def get(self, request): 5 6 """其它邏輯""" 7 8 #celery處理的其它任務(異步處理),下面這個代碼,celery會去處理,django直接執行下面的其它邏輯 9 r = add.delay(x,y) 10 task_id = r.id 11 12 """其它邏輯""" 13 14 return JsonResponse({"data":"123"})View Code
- 通過task_id可以查詢celery異步處理完的結果
1 from test1.celery import app 2 status = app.AsyncResult(task_id).status 3 result = app.AsyncResult(task_id).result 4 5 #狀態有這幾種情況 6 CELERY_STATUS = { 7 ‘PENDING‘: ‘等待開始‘, 8 ‘STARTED‘: ‘任務開始‘, 9 ‘SUCCESS‘: ‘成功‘, 10 ‘FAILURE‘: ‘失敗‘, 11 ‘RETRY‘: ‘重試‘, 12 ‘REVOKED‘: ‘任務取消‘, 13 }View Code
服務器啟動celery worker(消費者)任務,和定時任務:
- 打開項目的虛擬環境,進入項目名稱目錄下,即跟app同一級目錄下運行:
- 定時任務:celery -A auto_model_platform worker -l info --beat
- worker任務:celery -A auto_model_platform worker -l info
celery+django+redis使用介紹