celery在專案中使用和Django中使用
阿新 • • 發佈:2020-12-07
1.1 為什麼有這篇部落格
1.部落格說明
1.在做celery非同步任務和定時任務時,有些人使用django-celery+django-redis+celery+redis+django-celery-beat實現
2.但是這種實現方法和django結合過於緊密,不利於分散式部署
3.而且不同版本相結合,一旦不小心安裝升級一個包,會導致各種報錯
4.配置也比較繁瑣,很多同學在使用時易出錯
2、安裝相關包
pip install Django==2.2 pip install celery==4.4.7 pip install redis==3.5.3
1.2 celery基本使用
1、建立tasks.py檔案進行驗證
from celery import Celery import time app = Celery('TASK', broker='redis://localhost', backend='redis://localhost') @app.task def add(x, y): print("running..add.", x, y) return x + y @app.task def minus(x, y): time.sleep(tasks.py60) print("running..minus.", x, y) return x - y
1.1、啟動Celery Worker來開始監聽並執行任務
celery -A tasks worker --loglevel=info# tasks是tasks.py檔案:必須在tasks.py所在目錄下執行
1.2、呼叫任務:再開啟兩個終端,進行命令列模式,呼叫任務
>>> import tasks >>> import tasks >>> t2 = tasks.minus.delay(9,11)#然後在另一個終端重複上面步驟執行 >>> t1 = tasks.add.delay(3,4) >>> t1.get() #由於t2執行sleep了3s所以t1.get()需要等待
1.3、celery其他命令
>>> t.ready() #返回true證明可以執行,不必等待 >>> t.get(timeout=1) #如果1秒不返回結果就超時,避免一直等待 >>> t.get(propagate=False) #如果執行的程式碼錯誤只會列印錯誤資訊 >>> t.traceback #列印異常詳細結果
1.2 在django專案中使用
1.2.1、目錄結構如下
1.2.2、opwf_project/celery_task資料夾
# celery.py # -*- coding: utf-8 -*- from celery import Celery import os,sys import django # 1.新增django專案根路徑 CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf')) # 2.新增django環境 os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings") django.setup() # 讀取配置 # 3.celery基本配置 app = Celery('proj', broker='redis://localhost:6379/14', backend='redis://localhost:6379/15', include=['celery_task.tasks', 'celery_task.tasks2', ]) # 4.例項化時可以新增下面這個屬性 app.conf.update( result_expires=3600, #執行結果放到redis裡,一個小時沒人取就丟棄 ) # 5.配置定時任務:每5秒鐘執行 呼叫一次celery_pro下tasks.py檔案中的add函式 app.conf.beat_schedule = { 'add-every-5-seconds': { 'task': 'celery_task.tasks.test_task_crontab', 'schedule': 5.0, 'args': (16, 16) }, } # 6.新增時區配置 app.conf.timezone = 'UTC' if __name__ == '__main__': app.start()celery.py
# -*- coding: utf-8 -*- from celery import Celery import os,sys import django # 1.新增django專案根路徑 CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf')) # 2.新增django環境 os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings") django.setup() # 讀取配置 # 3.celery基本配置 app = Celery('proj', broker='redis://localhost:6379/14', backend='redis://localhost:6379/15', include=['celery_task.tasks', 'celery_task.tasks2', ]) # 4.例項化時可以新增下面這個屬性 app.conf.update( result_expires=3600, #執行結果放到redis裡,一個小時沒人取就丟棄 ) # 5.配置定時任務:每5秒鐘執行 呼叫一次celery_pro下tasks.py檔案中的add函式 app.conf.beat_schedule = { 'add-every-5-seconds': { 'task': 'celery_task.tasks.test_task_crontab', 'schedule': 5.0, 'args': (16, 16) }, } # 6.新增時區配置 app.conf.timezone = 'UTC' if __name__ == '__main__': app.start()tasks.py
# -*- coding:utf8 -*- from .celery import app import time,random @app.task def randnum(start,end): time.sleep(3) return random.randint(start,end)tasks2.py
1.2.3、opwf_project/opwf/utils
# -*- coding: utf-8 -*- # utils/rl_sms.py from ronglian_sms_sdk import SmsSDK from user.models import User accId = '8a216da8747ac98201749c0de38723b7' accToken = '86072b540b4648229b27400414150ef2' appId = '8a216da8747ac98201749c0de45123be' def send_message(phone, datas): user = User.objects.all()[0] print(user.username, '%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') sdk = SmsSDK(accId, accToken, appId) tid = '1' # 測試模板id為: 1. 內容為: 【雲通訊】您的驗證碼是{1},請於{2}分鐘內正確輸入。 # mobile = '13303479527' # datas = ('666777', '5') # 模板中的引數按照位置傳遞 # resp = sdk.sendMessage(tid, phone, datas) print("##########################################") print('執行了這個方法 send_message') return '' def test_crontab(x,y): print('############### 執行test_crontab測試任務 #############') print('############### 郵件審批超時提醒 #############')rl_sms.py
1.2.4、在django專案中呼叫
# 1.匯入任務 from celery_task import tasks # 2.執行非同步任務 tasks.send_sms_code.delay(18538752511,())