django+celery+redis應用
一、celery介紹
1、應用場景
a. Celery 是一個 基於python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理,如果你的業務場景中需要用到非同步任務,就可以考慮使用celery
b.你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程式等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情
c.Celery 在執行任務時需要通過一個訊息中介軟體來接收和傳送任務訊息,以及儲存任務結果, 一般使用rabbitMQ or Redis
2、redis的優點
a.簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的
b.高可用:當任務執行失敗或執行過程中發生連線中斷,celery 會自動嘗試重新執行任務
c.快速:一個單程序的celery每分鐘可處理上百萬個任務
d.靈活: 幾乎celery的各個元件都可以被擴充套件及自定製
4、celery的特性
1)方便檢視定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.
2)可選 多程序, Eventlet 和 Gevent 三種模型併發執行.
3)Celery 是語言無關的.它提供了python 等常見語言的介面支援.
二、celery元件
1、celery扮演生產者和消費者的角色
Celery Beat :任務排程器. Beat 程序會讀取配置檔案的內容, 週期性的將配置中到期需要執行的任務傳送給任務佇列.
Celery Worker :執行任務的消費者, 通常會在多臺伺服器執行多個消費者, 提高執行效率.
Broker :訊息代理, 佇列本身. 也稱為訊息中介軟體. 接受任務生產者傳送過來的任務訊息, 存進佇列再按序分發給任務消費方(通常是訊息佇列或者資料庫).
Producer :任務生產者. 呼叫 Celery API , 函式或者裝飾器, 而產生任務並交給任務佇列處理的都是任務生產者.
Result Backend :任務處理完成之後儲存狀態資訊和結果, 以供查詢.
3、產生任務的方式
1) 釋出者釋出任務(WEB 應用)
2)任務排程按期釋出任務(定時任務)
4、celery依賴的三個庫:這三個庫,都是由celery的開發者發開和維護
billiard :基於 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高效能和穩定性.
librabbitmp :C 語言實現的 Python 客戶端
kombu :Celery 自帶的用來收發訊息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高階藉口.
三、celery的使用
安裝相關依賴包
pip3 install Django==2.0.4 pip3 install celery==4.3.0 pip3 install redis==3.2.1 pip3 install django-celery==3.1.17 pip3 install ipython==7.6.1
在與專案同名的目錄下建立celery.py
# -*- coding: utf-8 -*- from __future__ import absolute_import import os from celery import Celery # 只要是想在自己的指令碼中訪問Django的資料庫等檔案就必須配置Django的環境變數 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings') # app名字 app = Celery('celery_test') # 配置celery class Config: BROKER_URL = 'redis://192.168.56.11:6379' CELERY_RESULT_BACKEND = 'redis://192.168.56.11:6379' app.config_from_object(Config) # 到各個APP裡自動發現tasks.py檔案 app.autodiscover_tasks() celery.py
在與專案同名的目錄下的init.py 檔案中新增下面內容
# -*- coding:utf8 -*- from __future__ import absolute_import, unicode_literals # 告訴Django在啟動時別忘了檢測我的celery檔案 from .celery import app as celery_ap __all__ = ['celery_app'] __init__.py
建立app01/tasks.py檔案
# -*- coding:utf8 -*- from __future__ import absolute_import, unicode_literals from celery import shared_task import time # 這裡不再使用@app.task,而是用@shared_task,是指定可以在其他APP中也可以呼叫這個任務 @shared_task def add(x,y): print('########## running add #####################') return x + y @shared_task def minus(x,y): time.sleep(30) print('########## running minus #####################') return x - y app01/tasks.py
保證啟動了redis-server
啟動一個celery的worker
celery multi start w1 w2 -A celery_pro -l info #一次性啟動w1,w2兩個worker celery -A celery_pro status #檢視當前有哪些worker在執行 celery multi stop w1 w2 -A celery_pro #停止w1,w2兩個worker celery multi start celery_test -A celery_test -l debug --autoscale=50,5 # celery併發數:最多50個,最少5個 ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9 # 關閉所有celery程序
測試celery
./manage.py shell import tasks t1 = tasks.minus.delay(5,3) t2 = tasks.add.delay(3,4) t1.get() t2.get()