4.非同步郵件傳送和定時任務
阿新 • • 發佈:2020-12-08
參考:https://www.cnblogs.com/xiaonq/p/14097376.html
1、建立tasks.py檔案進行驗證
from celery import Celery import time #測試 app = Celery('TASK', broker='redis://localhost', backend='redis://localhost') @app.task def add(x, y): print("running..add.", x, y) return x + y @app.tasktasks.pydef minus(x, y): time.sleep(60) print("running..minus.", x, y) return x - y
1.2啟動命令
celery -A tasks worker --loglevel=info # tasks是tasks.py檔案:必須在tasks.py所在目錄下執行
2.專案結構如下
2.1 opwf_project/celery_task資料夾
# -*- coding: utf-8 -*- # celery.py from celery import Celerycelery.pyimport os,sys import django # 1.新增django專案根路徑 CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf')) # 2.新增django環境 os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings") django.setup() # 讀取配置 # 3.celery基本配置 app = Celery('proj', broker='redis://localhost:6379/14', backend='redis://localhost:6379/15', include=['celery_task.tasks', ]) # 4.例項化時可以新增下面這個屬性 app.conf.update( result_expires=3600, #執行結果放到redis裡,一個小時沒人取就丟棄 ) # 5.配置定時任務:每5秒鐘執行 呼叫一次celery_pro下tasks.py檔案中的add函式 app.conf.beat_schedule = { 'add-every-5-seconds': { 'task': 'celery_task.tasks.test_task_crontab', 'schedule': 5.0, 'args': (16, 16) }, } # 6.新增時區配置 app.conf.timezone = 'Asia/Shanghai' print(dir(app)) if __name__ == '__main__': app.start()
from .celery import app import os,sys from .celery import CELERY_BASE_DIR '''測試定時任務''' @app.task() def test_task_crontab(x,y): '''新增django專案路徑''' sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../loonflow')) from utils.rl_sms import test_crontab res = test_crontab(x,y) return x + y @app.task(bind=True) def send_sms_code(self, mobile, datas): sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../loonflow')) # 在方法中導包 from utils.rl_sms import send_message # time.sleep(5) try: # 用 res 接收發送結果, 成功是:0, 失敗是:-1 res = send_message(mobile, datas) except Exception as e: res = '-1' if res == '-1': # 如果傳送結果是 -1 就重試. self.retry(countdown=5, max_retries=3, exc=Exception('簡訊傳送失敗'))tasks.py
3.1utils.py/rl_sms.py資料夾
from ronglian_sms_sdk import SmsSDK from user.models import User accId = '8aaf070875774c6d01758d48effb0a36' accToken = '10ccc33de038428d864cb5eb122649f6' appId = '8aaf070875774c6d01758d48f0cd0a3d' def send_message(mobile, datas): user = User.objects.all()[0] print(user.username, '%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') sdk = SmsSDK(accId, accToken, appId) tid = '1' # 測試模板id為: 1. 內容為: 【雲通訊】您的驗證碼是{1},請於{2}分鐘內正確輸入。 # mobile = '13303479527' # datas = ('666777', '5') # 模板中的引數按照位置傳遞 resp = sdk.sendMessage(tid, mobile, datas) print("##########################################") print('執行了這個方法 send_message') return resp def test_crontab(x,y): print('############### 執行test_crontab測試任務 #############') print('############### 郵件審批超時提醒 #############')rl_sms.py