1. 程式人生 > 程式設計 >詳解Django中非同步任務之django-celery

詳解Django中非同步任務之django-celery

Celery文件參考:http://docs.jinkan.org/docs/celery/

參考文章:https://www.jb51.net/article/158046.htm

Django中非同步任務---django-celery

Celery簡單介紹:

celery使用場景:

  1. 耗時任務定時任務
  2. 請求結果不怎麼重要的
  • 耗時任務比如:傳送簡訊驗證碼我們可以先發送給客戶任務狀態(請求成功或失敗)
  • 請求結果重要的建議使用django實現 比如:支付

首先簡單介紹一下,Celery 是一個強大的分散式任務佇列,它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務(asynctask)和定時任務(crontab)。它的架構組成如下圖

詳解Django中非同步任務之django-celery

Celery 主要包含以下幾個模組:

任務模組 Task

包含非同步任務和定時任務。其中,非同步任務通常在業務邏輯中被觸發併發往任務佇列,而定時任務由 Celery Beat 程序週期性地將任務發往任務佇列。

訊息中介軟體 Broker

Broker,即為任務排程佇列,接收任務生產者發來的訊息(即任務),將任務存入佇列。Celery 本身不提供佇列服務,官方推薦使用 RabbitMQ 和 Redis 等。

任務執行單元 Worker

Worker 是執行任務的處理單元,它實時監控訊息佇列,獲取佇列中排程的任務,並執行它。

任務結果儲存 Backend

Backend 用於儲存任務的執行結果,以供查詢。同訊息中介軟體一樣,儲存也可使用 RabbitMQ,Redis 和 MongoDB 等。

p>django-celery


首先需要統一一下使用的環境,以為如果redis的版本過高會報錯

詳解Django中非同步任務之django-celery

解決方法:建議降低redis版本

推薦版本

Django == 2.2.6

django-celery == 3.3.1

django-redis == 4.11.0

redis == 2.10.6

celery == 3.1.26.post2

依賴安裝:pip install .....詳解Django中非同步任務之django-celery人都知道

1.修改setting.py django配置檔案,增加如下:

import djcelery ###導包
djcelery.setup_loader() ###
BROKER_URL = 'redis://127.0.0.1:6379/2'
# BROKER_URL='redis://192.168.217.77:16379/2' #任何可用的redis都可以,不一定要在django server執行的主機上
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' ### 
INSTALLED_APPS = [
 ...
 "djcelery",# 加入djcelery應用
 ...
 
]
CELERY_TIMEZONE='Asia/Shanghai' #並沒有北京時區,與下面TIME_ZONE應該一致
TIME_ZONE='Asia/Shanghai' #

開頭增加如上配置檔案,根據實際情況配置redis的地址和埠,時區一定要設定為Asia/Shanghai。否則時間不準確回影響定時任務的執行。

上面程式碼首先匯出djcelery模組,並呼叫setup_loader方法載入有關配置;注意配置時區,不然預設使用UTC時間會比東八區慢8個小時。其中INSTALLED_APPS末尾新增兩項,分別表示新增celery服務和自己定義的apps服務。

2.建立Celery所需的資料表

python manage.py migrate
#如若不成功可以嘗試一下命令語句
#python manage.py syncdb

3.建立task

詳解Django中非同步任務之django-celery

stasks.py

# -*- coding: utf-8 -*-
import json,time
from syl.settings import ALY_ACCESSKEY_ID,ALY_ACCESSKEY_SECRET
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from celery import task
 
 
# 阿里雲簡訊驗證碼
 
@task
def Celery_Send_Sms(phone,data):
 client = AcsClient(ALY_ACCESSKEY_ID,ALY_ACCESSKEY_SECRET,'cn-hangzhou')
 request = CommonRequest()
 request.set_accept_format('json')
 request.set_domain('dysmsapi.aliyuncs.com')
 request.set_method('POST')
 request.set_protocol_type('https') # https | http
 request.set_version('2017-05-25')
 request.set_action_name('SendSms')
 request.add_query_param('RegionId',"cn-hangzhou")
 request.add_query_param('PhoneNumbers',phone)
 request.add_query_param('SignName',"美多商城")
 request.add_query_param('TemplateCode',"SMS_205397849")
 request.add_query_param('TemplateParam',data)
 response = client.do_action(request)
 time.sleep(10)
 print(str(response,encoding='utf-8'))
 res = json.loads(str(response,encoding='utf-8'))
 
#celery執行命令
# python manage.py celery worker --loglevel=info
  1. settings.py中的djcelery.setup_loader()執行時,Celery便會檢視所有INSTALLED_APPS中app目錄中的tasks.py檔案,找到標記為task的function,並將它們註冊為celery task.
  2. 在執行djcelery.setup_loader()時,task是以INSTALLED_APPS中的app名,加.tasks.function_name註冊的
  3. 一次需要注意 在impprt task時,需要保持一致
  4. 如果我們由於python path不同而使用不同的引用方式時(例如在tasks.py中使用from myproject.myapp.tasks import add形式),Celery將無法得知這是同一task,因此可能會引起奇怪的bug。

讓任務變成非同步

  例如我們希望在使用者發出request後非同步執行該task,馬上返回response,從而不阻塞該request,使使用者有一個流暢的訪問過程. 那麼,我們可以使用.delay。

詳解Django中非同步任務之django-celery

views.py

import re
import random
from rest_framework.permissions import AllowAny
from django_redis import get_redis_connection
from rest_framework.views import APIView
from rest_framework.response import Response
# from utils.MyBaseView import send_message,Send_Sms
from verifications.stasks import Celery_Send_Sms
 
# 使用者註冊簡訊驗證碼
class SmsCodeView(APIView):
 '''使用apiview的限流'''
 # 1. 所有人可以訪問
 permission_classes = (AllowAny,)
 
 def post(self,request):
  # 1. 獲取引數
  phone = request.data.get('phone') # 手機號
  image_code = request.data.get('image_code') # 字串驗證碼
  image_code_uuid = request.data.get('image_code_uuid') # 前端生成的uuid,是redis中圖片驗證碼的key
 
  # 2. 檢查引數
  if not all([phone,image_code,image_code_uuid]):
   return Response({'code': 400,'msg': '引數不全'})
 
  # 檢查手機號是否正確
  if not re.match(r'^1[3456789]\d{9}$',phone):
   return Response({"code": 999,"msg": "手機號碼不正確"})
 
  # 3. 檢查是否傳送
  redis_client = get_redis_connection('img_code') # 連線redis資料庫
  # phone_exists = redis_client.get(phone)
  # if phone_exists:
  #  return Response({"code": 999,"msg": "頻繁傳送,請稍後再試"})
 
  # 4.檢查圖片驗證碼是否合法
  redis_image_code = redis_client.get(image_code_uuid) # 字串驗證碼
  if redis_image_code:
   # bytes 轉成 string
   redis_image_code = redis_image_code.decode() # 把uuid解碼
 
  # 比較使用者提供的圖片內容是否和redis中儲存的一致
  if image_code.upper() != redis_image_code:
   return Response({'code': 999,'msg': '圖片驗證碼不正確'})
  # 5. 傳送
  code = '%06d' % random.randint(100000,999999) # 隨機6位驗證碼
  print('code===============================================',code)
  # 使用容聯雲簡訊驗證碼
  # send_resp = send_message(phone,(code,"5"))
 
  # 使用阿里雲簡訊驗證碼
  data = {'code': code}
  # send_resp = Send_Sms(phone,data)
  # 使用celery非同步傳送簡訊
  Celery_Send_Sms.delay(phone,data) #delay是註冊為celery非同步任務的關鍵點
  # Celery_Send_Sms(phone,data) # delay是註冊為celery非同步任務的關鍵點
 
  # 5.1 儲存code 到 redis中
  redis_client.setex(phone,60 * 5,code) # phone:code,5分鐘有效期
  # 5.2 從redis中刪除這個圖片驗證碼,以防再次被使用
  redis_client.delete(image_code_uuid)
 
  # 6.儲存這個已經發送驗證碼的手機號,防止頻繁傳送(使用pipeline 批量操作 )
  pl = redis_client.pipeline()
  pl.setex(phone,code)
  pl.delete(image_code_uuid)
  pl.execute()
  # 7. 返回結果
  return Response({"code": 200,"msg": "簡訊傳送成功"})

1.啟動celery

首先正常啟動你的django任務,然後啟動celery服務即可。

python manage.py celery worker --loglevel=info

詳解Django中非同步任務之django-celery

出現上圖這個報錯不讓超級管理員來啟動,在settings.py加入以下配置

from celery import Celery,platforms
platforms.C_FORCE_ROOT = True

2.驗證celery任務

  在搞定上面的東西以後,你就可以通過postman來請求介面讓介面使用celery來非同步執行任務而不阻塞你的request請求。

詳解Django中非同步任務之django-celery

到此這篇關於詳解Django中非同步任務之django-celery的文章就介紹到這了,更多相關Django非同步任務內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!