1. 程式人生 > >celery+django+redis使用介紹

celery+django+redis使用介紹

revoke sync bind __init__ ons bubuko redis class ucc

版本:

  • celery==3.1.25
  • django==1.8.6
  • redis==2.10.6

安裝:

  • 進入虛擬環境(虛擬環境創建不同,進入方式不同)
  • pip install celery==3.1.25(以celery安裝為例,其他安裝方式相同)

運行環境:

  • window10(celery4.0以後不支持windows)
  • linux

目錄結構:
技術分享圖片

  • 最外面的test1為項目名稱
  • 裏面的test1與app同級目錄,裏面都是一些配置(其中celery.py放在其中)。
  • app2為項目中的一個app
  • tasks.py在app2中,這個tasks.py只針對app2使用。若其他app也有celery任務。建立同樣的文件名即可。

運行celery,需要的幾個文件:

  • test1下的celery.py,代碼如下(裏面有介紹):
技術分享圖片
 1 from __future__ import absolute_import, unicode_literals
 2 import os
 3 from django.conf import settings
 4 from celery import Celery
 5 
 6 
 7 #設置 Django 的配置文件
 8 os.environ.setdefault(DJANGO_SETTINGS_MODULE, test.settings)
 9 
10 # 創建 celery 實例
11 app = Celery(test1) 12 13 # Using a string here means the worker will not have to 14 # pickle the object when using Windows. 15 app.config_from_object(django.conf:settings) 16 17 # 搜索所有 app 中的 tasks 18 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 19 20 @app.task(bind=True) 21 def
debug_task(self): 22 print(Request: {0!r}.format(self.request))
View Code
  • test1下的settings.py,代碼如下:
技術分享圖片
 1 """原來的django的settings內容"""
 2 
 3 #celery config
 4 #消息中間件(使用redis),消息代理,用於發布者傳遞消息給消費者
 5 BROKER_URL = redis://127.0.0.1:6379
 6 #消息結果返回中間件(使用redis),用於存儲任務執行結果
 7 CELERY_RESULT_BACKEND = redis://127.0.0.1:6379
 8 #允許的內容類型,
 9 CELERY_ACCEPT_CONTENT = [json]
10 #任務的序列化方式
11 CELERY_TASK_SERIALIZER = json
12 #任務結果的序列化方式
13 CELERY_RESULT_SERIALIZER = json
14 #celery時區,定時任務使用
15 CELERY_TIMEZONE = Asia/Shanghai
16 from datetime import timedelta
17 #定時任務處理,使用的schedule,裏面的task,寫上任務的“路徑”,schedule設置時間,args設置參數。
18 CELERYBEAT_SCHEDULE = {
19 add_every_10_seconds: {
20 task: app2.tasks.add,
21 schedule: timedelta(seconds=10),
22 args: (4,4)
23 },
24 }
View Code
  • test1下的__init__.py,代碼如下:

技術分享圖片
1 from __future__ import absolute_import
2 from .celery import app as celery_app
3 # 這是為了確保在django啟動時啟動 celery
View Code
  • app2下的tasks.py,這個tasks文件只針對該app2使用,其他app可以根據實際情況添加tasks.py文件,裏面寫相應的任務代碼。代碼如下:
技術分享圖片
1 # -*- coding:utf-8 -*-
2 from __future__ import absolute_import
3 from auto_model_platform.celery import app
4 
5 @app.task
6 def add(x, y):
7     time.sleep(30)
8     print("running...", x, y)
9     return x + y
View Code

調用tasks.py內的函數:

  • 在app2.views.py中的某個視圖函數中直接調用,比如調用tasks.py中的add(x,y)函數,這樣調用。代碼如下:
技術分享圖片
 1 from app2.tasks import add
 2 
 3 class TestView(View):
 4      def get(self, request):
 5 
 6     """其它邏輯"""
 7 
 8     #celery處理的其它任務(異步處理),下面這個代碼,celery會去處理,django直接執行下面的其它邏輯
 9     r = add.delay(x,y)
10     task_id = r.id
11 
12      """其它邏輯"""
13 
14     return JsonResponse({"data":"123"})
View Code
  • 通過task_id可以查詢celery異步處理完的結果
技術分享圖片
 1 from test1.celery import app
 2 status = app.AsyncResult(task_id).status
 3 result = app.AsyncResult(task_id).result
 4 
 5 #狀態有這幾種情況
 6 CELERY_STATUS = {
 7     PENDING: 等待開始,
 8     STARTED: 任務開始,
 9     SUCCESS: 成功,
10     FAILURE: 失敗,
11     RETRY: 重試,
12     REVOKED: 任務取消,
13 }
View Code

服務器啟動celery worker(消費者)任務,和定時任務:

  • 打開項目的虛擬環境,進入項目名稱目錄下,即跟app同一級目錄下運行:
  • 定時任務:celery -A auto_model_platform worker -l info --beat
  • worker任務:celery -A auto_model_platform worker -l info

celery+django+redis使用介紹