1. 程式人生 > >celery-分佈任務神器--django任務

celery-分佈任務神器--django任務

celery是一個分散式非同步框架,當我們有一個需求,比如我需要進行大批量的郵箱傳送,或者部落格訂閱推送的時候,會造成大量的等待執行,這時候就會用到celery,

Celery 介紹

在Celery中幾個基本的概念,需要先了解下,不然不知道為什麼要安裝下面的東西。概念:Broker、Backend。

什麼是broker?

broker是一個訊息傳輸的中介軟體,可以理解為一個郵箱。每當應用程式呼叫celery的非同步任務的時候,會向broker傳遞訊息,而後celery的worker將會取到訊息,進行對於的程式執行。好吧,這個郵箱可以看成是一個訊息佇列。其中Broker的中文意思是 經紀人 ,其實就是一開始說的 訊息佇列 ,用來發送和接受訊息。這個Broker有幾個方案可供選擇:RabbitMQ (訊息佇列),

Redis(快取資料庫),資料庫(不推薦),等等

什麼是backend?

通常程式傳送的訊息,發完就完了,可能都不知道對方時候接受了。為此,celery實現了一個backend,用於儲存這些訊息以及celery執行的一些訊息和結果。Backend是在Celery的配置中的一個配置項 CELERY_RESULT_BACKEND ,作用是儲存結果和狀態,如果你需要跟蹤任務的狀態,那麼需要設定這一項,可以是Database backend,也可以是Cache backend,具體可以參考這裡: CELERY_RESULT_BACKEND 。

對於 brokers,官方推薦是 rabbitmq 和 redis,至於 backend,就是資料庫。為了簡單可以都使用 redis。

我自己演示使用RabbitMQ作為Broker,用MySQL作為backend。

來一張圖,這是在網上最多的一張Celery的圖了,確實描述的非常好

這裡寫圖片描述

Celery的架構由三部分組成,訊息中介軟體(message broker),任務執行單元(worker)和任務執行結果儲存(task result store)組成。

訊息中介軟體

Celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,RabbitMQ, RedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的執行在分散式的系統節點中。

任務結果儲存

Task result store用來儲存Worker執行的任務的結果,Celery支援以不同方式儲存任務的結果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。

一、安裝

pip install celery redis 

二、編寫

#tasks.py

from celery import Celery
import time

app = Celery("task_transport", #任務名稱,可隨意填寫
            borker="redis://127.0.0.1:6379", #任務佇列,指定從哪裡排程,
            backend="redis://127.0.0.1:6379" #存放結果
        )

@app.task
def add(x,y):
    time.sleep(15)
    print('function is running')
    return x + y

三、啟動

celery  -A task worker -l info #task為指令碼名稱 worker為工作模式,info輸出日誌到螢幕

四、測試

#終端ipython
In [1]: import celery

In [2]: import task

In [3]: s = task.add.delay(10,20) #呼叫add函式的delay方法傳入引數

In [4]: s
Out[4]: <AsyncResult: c3e9c1d3-310f-46cf-9956-b3b897f10b7b>

In [5]: s.get() #獲取返回結果

五、專案模式

#celery_project/
    task2.py    
    celery.py   
    _pycache__ 

#celery.py

from __future__ import absolute_import,unicode_literals 
from celery import Celery 

app = Celery(
        "project",
        broker="redis://127.0.0.1:6379",
        backend="redis://127.0.0.1:6379",
        include=["task1","task2"]) #執行多個任務,這裡對應的是當前目錄的task1.py,task2.py

app.conf.update(
    result_expires=3600, #任務結果一小時內沒有獲取則過期
)
app.config_from_object('celery_app.celeryconfig') #通過Celery例項載入配置模組

if __name__ == '__main__':
    app.start()

#task1.py & task2.py

from __future__ import absolute_import,unicode_literals 
from .celery import app 

@app.task
def add(x,y):
    return x + y 

@app.task
def delete(x,y):
    return x - y 

#執行
celery -A celery_project worker -l info //celery_project為目錄名稱

#測試方法一樣,開啟另外一個終端呼叫task1或task2.py中的函式

celery常用方法

result = task.add.delay(10,20)  //呼叫函式和delay方法

result.get() //獲取結果,timeout預設為0

result.ready() //如果函式執行出現阻塞則返回False,可進行判斷!

result.get(propagate=False) // 如果出錯,獲取錯誤結果,不觸發異常

result.traceback //列印異常詳細結果

result.id //任務id

celery multi start name  project_name -l info //啟動celery任務並後臺執行

celery multi restart worker -A  celery_project //重新啟動celery任務

celery multi stop worker //停止

Celery定時任務

#task1.py

from __future__ import absolute_import,unicode_literals
from .celery import app //從主程式倒入
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender,**kwargs):

    #請注意,這裡的name不能相同!必須唯一,否則執行該方法!
    sender.add_periodic_task(5.0,delete.s(10,20),name="add every 10") //每五秒
    sender.add_periodic_task(5.0,add.s(2,20),name="add every 10")

    sender.add_periodic_task(
            crontab(hour=20,minute=15), //每天20點15
            add.s(200,5)
        )

@app.task
def add(x,y):
    return x + y

@app.task
def delete(x,y):
    return x - y 

#啟動worker
celery -A celery_project worker -l debug 

#啟動beat
celery -A celery_project.task1 beat -l debug 

基於配置檔案方式定義執行任務

app.conf.beat_schedule = {
    'add-every-30-seconds' : {
        'task': 'celery_project.task1.add',
        'schedule': 5.0,
        'args': (100,200)
    }
}

app.conf.timezone = 'UTC'

Django & Celery

pip install django-celery  //安裝django-celery

#需要和settings.py同級建立celery.py
#settings.py
INSTALL_APP -> djcelery //INSTALL_APP下加入djcelery

import djcelery

djcelery.setup_loader()

BROKER_URL = 'redis://127.0.0.1:6379' #佇列地址

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379' #儲存結果

CELERY_TASK_SERIALIZER = 'json'

CELERY_RESULT_SERIALIZER = 'json'

CELERY_ACCEPT_CONTENT = ['json']

CELERY_IMPORTS = ('api.tasks', )

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' #是使用了django-celery預設的資料庫排程模型,任務執行週期都被存在你指定的orm資料庫中

CELERYD_CONCURRENCY = 20 #worker併發數

CELERY_TASK_RESULT_EXPIRES = 1200 #任務結果過期時間

CELERYD_PREFETCH_MULTIPLIER = 4 # celery worker 每次去redis取任務的數量

CELERYD_MAX_TASKS_PER_CHILD = 200 # 每個worker執行了多少任務就會死掉,我建議數量可以大一些,比如200

#celery.py -> 與settings.py同級

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Celery_Project.settings') 

app = Celery('Celery_Project')

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

#__init__.py

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

#{app_name}/tasks.py 可以在每一個app下建立tasks.py檔案進行celery任務編寫

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

#start
python manage.py migrate djcelery //生成資料表

python manage.py runserver  //啟動django

python manage.py  celery worker -c 6 -l debug //啟動celery