1. 程式人生 > 實用技巧 >celery+redis應用+django

celery+redis應用+django

一、celery介紹

  1、應用場景

    a. Celery 是一個 基於python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理,如果你的業務場景中需要用到非同步任務,就可以考慮使用celery

    b.你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程式等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情

    c.Celery 在執行任務時需要通過一個訊息中介軟體來接收和傳送任務訊息,以及儲存任務結果, 一般使用rabbitMQ or Redis

  2、redis的優點

    a.簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的

    b.高可用:當任務執行失敗或執行過程中發生連線中斷,celery 會自動嘗試重新執行任務

    c.快速:一個單程序的celery每分鐘可處理上百萬個任務

    d.靈活: 幾乎celery的各個元件都可以被擴充套件及自定製

  3. celery的工作流程

    

    user:使用者程式,用於告知celery去執行一個任務。
    broker:存放任務(依賴RabbitMQ或Redis,進行儲存)
    worker:執行任務

  4、celery的特性

      1)方便檢視定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.

      2)可選 多程序, Eventlet 和 Gevent 三種模型併發執行.

      3)Celery 是語言無關的.它提供了python 等常見語言的介面支援.

二、celery元件

  1、celery扮演生產者和消費者的角色

      Celery Beat :任務排程器. Beat 程序會讀取配置檔案的內容, 週期性的將配置中到期需要執行的任務傳送給任務佇列.

      Celery Worker :執行任務的消費者, 通常會在多臺伺服器執行多個消費者, 提高執行效率.

      Broker :訊息代理, 佇列本身. 也稱為訊息中介軟體. 接受任務生產者傳送過來的任務訊息, 存進佇列再按序分發給任務消費方(通常是訊息佇列或者資料庫).

      Producer :任務生產者. 呼叫 Celery API , 函式或者裝飾器, 而產生任務並交給任務佇列處理的都是任務生產者.

      Result Backend :任務處理完成之後儲存狀態資訊和結果, 以供查詢.

  2、celery架構圖

      

  3、產生任務的方式

      1) 釋出者釋出任務(WEB 應用)

      2)任務排程按期釋出任務(定時任務)

  4、celery依賴的三個庫:這三個庫,都是由celery的開發者發開和維護

      billiard :基於 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高效能和穩定性.

      librabbitmp :C 語言實現的 Python 客戶端

      kombu :Celery 自帶的用來收發訊息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高階藉口.

三、celery的使用

  安裝相關依賴包

pip3 install Django==2.0.4
pip3 install celery==4.3.0
pip3 install redis==3.2.1
pip3 install  django-celery==3.1.17
pip3 install ipython==7.6.1 

  在與專案同名的目錄下建立celery.py

# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
from celery import Celery

# 只要是想在自己的指令碼中訪問Django的資料庫等檔案就必須配置Django的環境變數
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')

# app名字
app = Celery('celery_test')

# 配置celery
class Config:
    BROKER_URL = 'redis://192.168.56.11:6379'
    CELERY_RESULT_BACKEND = 'redis://192.168.56.11:6379'

app.config_from_object(Config)
# 到各個APP裡自動發現tasks.py檔案
app.autodiscover_tasks()
celery.py

  在與專案同名的目錄下的init.py 檔案中新增下面內容

# -*- coding:utf8 -*-
from __future__ import absolute_import, unicode_literals

# 告訴Django在啟動時別忘了檢測我的celery檔案
from .celery import app as celery_ap
__all__ = ['celery_app']
__init__.py
__init__.py

  建立app01/tasks.py檔案

# -*- coding:utf8 -*-
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

# 這裡不再使用@app.task,而是用@shared_task,是指定可以在其他APP中也可以呼叫這個任務
@shared_task
def add(x,y):
    print('########## running add #####################')
    return x + y

@shared_task
def minus(x,y):
    time.sleep(30)
    print('########## running minus #####################')
    return x - y
app01/tasks.py

  保證啟動了redis-server

  啟動一個celery的worker

celery multi start w1 w2 -A celery_pro -l info     #一次性啟動w1,w2兩個worker
celery -A celery_pro status                        #檢視當前有哪些worker在執行
celery multi stop w1 w2 -A celery_pro              #停止w1,w2兩個worker

celery  multi start celery_test -A celery_test -l debug --autoscale=50,5        # celery併發數:最多50個,最少5個
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9       # 關閉所有celery程序
celery/worker

  測試celery

./manage.py shell
import tasks
t1 = tasks.minus.delay(5,3)
t2 = tasks.add.delay(3,4)
t1.get()
t2.get()
測試celery