1. 程式人生 > 實用技巧 >Python celery原理及執行流程

Python celery原理及執行流程

目錄

1. Celery介紹

https://www.cnblogs.com/xiaonq/p/11166235.html#i1

1.1 celery應用舉例

  • Celery 是一個 基於python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理,如果你的業務場景中需要用到非同步任務,就可以考慮使用celery;
  • 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程式等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情;
  • Celery 在執行任務時需要通過一個訊息中介軟體來接收和傳送任務訊息,以及儲存任務結果, 一般使用rabbitMQ or Redis。

1.2 Celery有以下優點

  • 簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的;
  • 高可用:當任務執行失敗或執行過程中發生連線中斷,celery 會自動嘗試重新執行任務;
  • 快速:一個單程序的celery每分鐘可處理上百萬個任務;
  • 靈活: 幾乎celery的各個元件都可以被擴充套件及自定製。

1.3 Celery 特性

  • 方便檢視定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.
  • 可選 多程序, Eventlet 和 Gevent 三種模型併發執行.
  • Celery 是語言無關的.它提供了python 等常見語言的介面支援.

2. celery 元件

https://www.cnblogs.com/xiaonq/p/11166235.html#i2

1.1 Celery 扮演生產者和消費者的角色

  • Celery Beat : 任務排程器. Beat 程序會讀取配置檔案的內容, 週期性的將配置中到期需要執行的任務傳送給任務佇列.
  • Celery Worker : 執行任務的消費者, 通常會在多臺伺服器執行多個消費者, 提高執行效率.
  • Broker : 訊息代理, 佇列本身. 也稱為訊息中介軟體. 接受任務生產者傳送過來的任務訊息, 存進佇列再按序分發給任務消費方(通常是訊息佇列或者資料庫).
  • Producer : 任務生產者. 呼叫 Celery API , 函式或者裝飾器, 而產生任務並交給任務佇列處理的都是任務生產者.
  • Result Backend : 任務處理完成之後儲存狀態資訊和結果, 以供查詢.

1.2 celery架構圖

1.3 產生任務的方式

  • 釋出者釋出任務(WEB 應用)
  • 任務排程按期釋出任務(定時任務)

1.4 celery 依賴三個庫: 這三個庫, 都由 Celery 的開發者開發和維護.

  • billiard : 基於 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高效能和穩定性.
  • librabbitmp : C 語言實現的 Python 客戶端
  • kombu : Celery 自帶的用來收發訊息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高階藉口.

3.celery配置與基本使用

1.1 安裝celery

 pip install celery @ 
 https://github.com/celery/celery/tarball/master

1.2 新建celery/main.py配置celery

 # celery_task/main.py
 import os
 from celery import Celery

 # 定義celery例項, 需要的引數, 1, 例項名, 2, 任務釋出位置, 3, 結果儲存位置
 app = Celery('mycelery',

 broker='redis://127.0.0.1:6379/14',  # 任務存放的地方 

 backend='redis://127.0.0.1:6379/15')  # 結果存放的地方

 @app.task
 def add(x, y):
     return x + y

4.測試celery

1.1 啟動celery

 '''1.啟動celery'''
 #1.1 單程序啟動celery
 celery -A main worker -l INFO
 #1.2 celery管理
 celery  multi start celery_test -A celery_test 
 -l debug --autoscale=50,5        # celery併發
 數:最多50個,最少5個
 ps auxww|grep "celery worker"|grep -v grep|awk  
 '{print $2}'|xargs kill -9       # 關閉所有
 celery程序

5. 使用celery非同步傳送簡訊

1.1 在celery_task/mian.py中添加發送簡訊函式

 # celery專案中的所有導包地址, 都是以
 CELERY_BASE_DIR為基準設定.
 # 執行celery命令時, 也需要進入CELERY_BASE_DIR目錄執行.
 CELERY_BASE_DIR = 
 os.path.dirname(os.path.abspath(__file__))

 
 @app.task(bind=True)
 def send_sms_code(self, mobile, datas):
     sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../syl'))
     # 在方法中導包
     from libs.rl_sms import send_message
     # time.sleep(5)
     try:
         # 用 res 接收發送結果, 成功是:0, 失敗是:-1
         res = send_message(mobile, datas)
     except Exception as e:
         res = '-1'

     if res == '-1':
         # 如果傳送結果是 -1  就重試.
         self.retry(countdown=5, max_retries=3, exc=Exception('簡訊傳送失敗'))

1.2 在verifications/views.py中新增celery傳送簡訊試圖函式

 class SmsCodeView(APIView):
     """使用apiview的限流"""
     # 1. 所有人可以訪問
     permission_classes = (AllowAny,)

     def post(self, request):
         # 1. 獲取引數
         phone = request.data.get('phone')  # 手機號
         image_code = request.data.get('image_code')  # 圖片驗證碼
         image_code_uuid = request.data.get('image_code_uuid')  # 前端生成的uuid

         # 2. 檢查引數
         if not all([phone, image_code, image_code_uuid]):
             return Response({"code": 999, "msg": "引數不全"})
         if not re.match(r'^1[3456789]\d{9}$', phone):
             return Response({"code": 999, "msg": "手機號碼不正確"})

         # 3. 檢查是否傳送
         redis_client = get_redis_connection('img_code')
         phone_exists = redis_client.get(phone)
         if phone_exists:
             return Response({"code": 999, "msg": "頻繁傳送, 請稍後再試"})

         # 驗證圖形驗證碼
         redis_image_code = redis_client.get(image_code_uuid)  # bytes
         if redis_image_code:
             # bytes 轉成 string
             redis_image_code = redis_image_code.decode()

         # 比較使用者提供的圖片內容是否和redis中儲存的一致
         if image_code.upper() != redis_image_code:
             return Response({'code': 999, 'msg': '圖片驗證碼不正確'})

         # 4. 傳送
         code = '%06d' % random.randint(0, 999999)  # 隨機6位驗證碼
 
         from syl.settings import BASE_DIR
         sys.path.insert(0, os.path.join(BASE_DIR, '../celery_task'))
         from main import send_sms_code  # 必須這麼寫, 從main中導包

         send_sms_code.delay(phone, (code, "5"))
         print(code)

         # 5.使用 pipeline 批量操作
         pl = redis_client.pipeline()    # 例項化pipeline物件
         pl.setex(phone, 60 * 5, code)   # 儲存phone:code, 5分鐘有效期
         pl.delete(image_code_uuid)      # 從redis中刪除這個圖片驗證碼, 以防再次被使用
         pl.execute()

         # 6. 返回結果
         return Response({"code": 0, "msg": "簡訊傳送成功"})

1.3 新增路由

  xurlpatterns = [  
       path('sms_codes/', 
  views.SmsCodeView.as_view()),
  ]

6.測試介面

  • 介面URL
    http://192.168.56.100:8888/user/sms_codes/
  • 請求攜帶引數
 {  
     "phone": 18538752511,  
     "image_code":"aed3",          
                    # 前端生成的 圖形驗證碼              
     "image_code_uuid":"de8edce2-fc9f-11ea-9325-
005056c00008"      # 前端生成的uuid
 }