Python celery原理及執行流程
阿新 • • 發佈:2020-10-07
目錄
1.2 新建
1.1 在
1.2 在
1. Celery介紹
https://www.cnblogs.com/xiaonq/p/11166235.html#i1
1.1 celery應用舉例
- Celery 是一個 基於python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理,如果你的業務場景中需要用到非同步任務,就可以考慮使用celery;
- 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程式等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情;
- Celery 在執行任務時需要通過一個訊息中介軟體來接收和傳送任務訊息,以及儲存任務結果, 一般使用rabbitMQ or Redis。
1.2 Celery有以下優點
- 簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的;
- 高可用:當任務執行失敗或執行過程中發生連線中斷,celery 會自動嘗試重新執行任務;
- 快速:一個單程序的celery每分鐘可處理上百萬個任務;
- 靈活: 幾乎celery的各個元件都可以被擴充套件及自定製。
1.3 Celery 特性
- 方便檢視定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.
- 可選 多程序, Eventlet 和 Gevent 三種模型併發執行.
- Celery 是語言無關的.它提供了python 等常見語言的介面支援.
2. celery 元件
https://www.cnblogs.com/xiaonq/p/11166235.html#i2
1.1 Celery 扮演生產者和消費者的角色
- Celery Beat : 任務排程器. Beat 程序會讀取配置檔案的內容, 週期性的將配置中到期需要執行的任務傳送給任務佇列.
- Celery Worker : 執行任務的消費者, 通常會在多臺伺服器執行多個消費者, 提高執行效率.
- Broker : 訊息代理, 佇列本身. 也稱為訊息中介軟體. 接受任務生產者傳送過來的任務訊息, 存進佇列再按序分發給任務消費方(通常是訊息佇列或者資料庫).
- Producer : 任務生產者. 呼叫 Celery API , 函式或者裝飾器, 而產生任務並交給任務佇列處理的都是任務生產者.
- Result Backend : 任務處理完成之後儲存狀態資訊和結果, 以供查詢.
1.2 celery架構圖
1.3 產生任務的方式
- 釋出者釋出任務(WEB 應用)
- 任務排程按期釋出任務(定時任務)
1.4 celery 依賴三個庫: 這三個庫, 都由 Celery 的開發者開發和維護.
billiard
: 基於 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高效能和穩定性.librabbitmp :
C 語言實現的 Python 客戶端kombu :
Celery 自帶的用來收發訊息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高階藉口.
3.celery配置與基本使用
1.1 安裝celery
pip install celery @
https://github.com/celery/celery/tarball/master
1.2 新建celery/main.py
配置celery
# celery_task/main.py
import os
from celery import Celery
# 定義celery例項, 需要的引數, 1, 例項名, 2, 任務釋出位置, 3, 結果儲存位置
app = Celery('mycelery',
broker='redis://127.0.0.1:6379/14', # 任務存放的地方
backend='redis://127.0.0.1:6379/15') # 結果存放的地方
@app.task
def add(x, y):
return x + y
4.測試celery
1.1 啟動celery
'''1.啟動celery'''
#1.1 單程序啟動celery
celery -A main worker -l INFO
#1.2 celery管理
celery multi start celery_test -A celery_test
-l debug --autoscale=50,5 # celery併發
數:最多50個,最少5個
ps auxww|grep "celery worker"|grep -v grep|awk
'{print $2}'|xargs kill -9 # 關閉所有
celery程序
5. 使用celery非同步傳送簡訊
1.1 在celery_task/mian.py
中添加發送簡訊函式
# celery專案中的所有導包地址, 都是以
CELERY_BASE_DIR為基準設定.
# 執行celery命令時, 也需要進入CELERY_BASE_DIR目錄執行.
CELERY_BASE_DIR =
os.path.dirname(os.path.abspath(__file__))
@app.task(bind=True)
def send_sms_code(self, mobile, datas):
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../syl'))
# 在方法中導包
from libs.rl_sms import send_message
# time.sleep(5)
try:
# 用 res 接收發送結果, 成功是:0, 失敗是:-1
res = send_message(mobile, datas)
except Exception as e:
res = '-1'
if res == '-1':
# 如果傳送結果是 -1 就重試.
self.retry(countdown=5, max_retries=3, exc=Exception('簡訊傳送失敗'))
1.2 在verifications/views.py
中新增celery傳送簡訊試圖函式
class SmsCodeView(APIView):
"""使用apiview的限流"""
# 1. 所有人可以訪問
permission_classes = (AllowAny,)
def post(self, request):
# 1. 獲取引數
phone = request.data.get('phone') # 手機號
image_code = request.data.get('image_code') # 圖片驗證碼
image_code_uuid = request.data.get('image_code_uuid') # 前端生成的uuid
# 2. 檢查引數
if not all([phone, image_code, image_code_uuid]):
return Response({"code": 999, "msg": "引數不全"})
if not re.match(r'^1[3456789]\d{9}$', phone):
return Response({"code": 999, "msg": "手機號碼不正確"})
# 3. 檢查是否傳送
redis_client = get_redis_connection('img_code')
phone_exists = redis_client.get(phone)
if phone_exists:
return Response({"code": 999, "msg": "頻繁傳送, 請稍後再試"})
# 驗證圖形驗證碼
redis_image_code = redis_client.get(image_code_uuid) # bytes
if redis_image_code:
# bytes 轉成 string
redis_image_code = redis_image_code.decode()
# 比較使用者提供的圖片內容是否和redis中儲存的一致
if image_code.upper() != redis_image_code:
return Response({'code': 999, 'msg': '圖片驗證碼不正確'})
# 4. 傳送
code = '%06d' % random.randint(0, 999999) # 隨機6位驗證碼
from syl.settings import BASE_DIR
sys.path.insert(0, os.path.join(BASE_DIR, '../celery_task'))
from main import send_sms_code # 必須這麼寫, 從main中導包
send_sms_code.delay(phone, (code, "5"))
print(code)
# 5.使用 pipeline 批量操作
pl = redis_client.pipeline() # 例項化pipeline物件
pl.setex(phone, 60 * 5, code) # 儲存phone:code, 5分鐘有效期
pl.delete(image_code_uuid) # 從redis中刪除這個圖片驗證碼, 以防再次被使用
pl.execute()
# 6. 返回結果
return Response({"code": 0, "msg": "簡訊傳送成功"})
1.3 新增路由
xurlpatterns = [
path('sms_codes/',
views.SmsCodeView.as_view()),
]
6.測試介面
- 介面URL
http://192.168.56.100:8888/user/sms_codes/
- 請求攜帶引數
{
"phone": 18538752511,
"image_code":"aed3",
# 前端生成的 圖形驗證碼
"image_code_uuid":"de8edce2-fc9f-11ea-9325-
005056c00008" # 前端生成的uuid
}