1. 程式人生 > 實用技巧 >Django-celery非同步傳送資訊(3)

Django-celery非同步傳送資訊(3)

1.使用celery非同步傳送簡訊

1.1在celery_task/main.py中添加發送簡訊函式

# celery專案中的所有導包地址,都是以CELERY_BASE_DIR為基準設定。
#執行celery命令時,也需要進入CELERY_BASE_DIR目錄執行。
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))

@app.task(bind=True)
def send_sms_code(self,mobile,datas):
    sys.path.insert(0,os.path.join(CELERY_BASE_DIR,'../SYL'))
    #在方法中導包
    from libs.rl_sms import send_message
    # time.sleep(5)
    try:
        # 用res 接受傳送結果,成功是:0,失敗時:-1
        res = '-1'
        
    if res == '-1' :
        #如果傳送結果-1 就是重試。
        self.retry(countdown=5,max_retries=3,exc=Exception('簡訊傳送失敗'))

1.2在vifications/views.py中新增celery中傳送簡訊檢視函式

class SmsCodeView(APIView):
    """使用apiview的限流"""
    # 1.所有人可以訪問
    permission_classes = (AllowAny,)
    
    def post(self,request):
        # 1.獲取引數
        phone = request.data.get('phone') # 手機號
        imgage_code = request.data.get('image_code') # 圖片驗證碼
        imgage_code_uuid = request.data.get('imgage_code_uuid') #前端生成的uuid
        
        #2.檢查引數
        if not all([phone,image_code,image_code_uuid]):
            return Response({'code':999,"msg":"引數不全"})
        if not re.macth(r"^1[3456789]\d{9}$",phone)
        	return Response({"code":999,"msg":"手機號碼不正確"})
        
        # 3.檢查是否傳送
        redis_clent = get_redis_connection("img_code")
        phone_exists = redis_celent.get(phone)
        if phone_exists:
            return Response({"code":999,"msg":"頻繁傳送,請稍後再試"})
        
        #驗證圖形驗證碼
        redis_image_code = redis_client.get(image_code_uuid) # bytes
        if redis_image_code:
            # bytes 轉成 string
            redis_imgage_code = redis_image_code.decode()
            
        # 比較使用者提供的圖片內容是否和redis中儲存的一致
        if image_code.upper() != redis_image_code:
            return Response({'code':999,"msg":"圖片驗證碼不正確"})
        
        #4.傳送
        code = '%06d' % random.randint(0,999999) # 隨機6位驗證碼
        
        from syl.settings import BASE_DIR
        sys.path.insert(0,os.path.join(BASE_DIR,'../celery_task'))
        from main import send_sms_code # 必須這麼寫,從main中導包
        
        send_sms_code.delay(phone,(code,"5"))
        print(code)
        
        #5.使用pipeline操作
        pl = redis_client.pipeline() #例項化pipeline物件
        pl.setex(phone,60*5,code) #儲存phone:code ,5分鐘有效期
        pl.delete(image_code_uuid) #從redis中刪除這個圖片驗證碼,以防再次被使用
        pl.execute()
        
        # 6.返回結果
        return Response({"code":0,"msg":"簡訊傳送成"})

1.3 新增路由

urlpatterns = [
    path('sms_codes/', views.SmsCodeView.as_view()),
]

2.測試介面

  • 介面URL
http://192.168.56.100:8888/user/sms_codes/
  • 請求攜帶引數
{
    "phone": 18538752511,
    "image_code":"aed3",                                         # 前端生成的 圖形驗證碼
    "image_code_uuid":"de8edce2-fc9f-11ea-9325-005056c00008"     # 前端生成的uuid
}