celery-分佈任務神器--django任務
celery
是一個分散式非同步框架,當我們有一個需求,比如我需要進行大批量的郵箱傳送,或者部落格訂閱推送的時候,會造成大量的等待執行,這時候就會用到celery
,
Celery 介紹
在Celery中幾個基本的概念,需要先了解下,不然不知道為什麼要安裝下面的東西。概念:Broker、Backend。
什麼是broker?
broker是一個訊息傳輸的中介軟體,可以理解為一個郵箱。每當應用程式呼叫celery的非同步任務的時候,會向broker傳遞訊息,而後celery的worker將會取到訊息,進行對於的程式執行。好吧,這個郵箱可以看成是一個訊息佇列。其中Broker的中文意思是 經紀人 ,其實就是一開始說的 訊息佇列 ,用來發送和接受訊息。這個Broker有幾個方案可供選擇:RabbitMQ (訊息佇列),
什麼是backend?
通常程式傳送的訊息,發完就完了,可能都不知道對方時候接受了。為此,celery實現了一個backend,用於儲存這些訊息以及celery執行的一些訊息和結果。Backend是在Celery的配置中的一個配置項 CELERY_RESULT_BACKEND ,作用是儲存結果和狀態,如果你需要跟蹤任務的狀態,那麼需要設定這一項,可以是Database backend,也可以是Cache backend,具體可以參考這裡: CELERY_RESULT_BACKEND 。
對於 brokers,官方推薦是 rabbitmq 和 redis,至於 backend,就是資料庫。為了簡單可以都使用 redis。
我自己演示使用RabbitMQ作為Broker,用MySQL作為backend。
來一張圖,這是在網上最多的一張Celery的圖了,確實描述的非常好
Celery的架構由三部分組成,訊息中介軟體(message broker),任務執行單元(worker)和任務執行結果儲存(task result store)組成。
訊息中介軟體
Celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任務執行單元
Worker是Celery提供的任務執行的單元,worker併發的執行在分散式的系統節點中。
任務結果儲存
Task result store用來儲存Worker執行的任務的結果,Celery支援以不同方式儲存任務的結果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。
一、安裝
pip install celery redis
二、編寫
#tasks.py
from celery import Celery
import time
app = Celery("task_transport", #任務名稱,可隨意填寫
borker="redis://127.0.0.1:6379", #任務佇列,指定從哪裡排程,
backend="redis://127.0.0.1:6379" #存放結果
)
@app.task
def add(x,y):
time.sleep(15)
print('function is running')
return x + y
三、啟動
celery -A task worker -l info #task為指令碼名稱 worker為工作模式,info輸出日誌到螢幕
四、測試
#終端ipython
In [1]: import celery
In [2]: import task
In [3]: s = task.add.delay(10,20) #呼叫add函式的delay方法傳入引數
In [4]: s
Out[4]: <AsyncResult: c3e9c1d3-310f-46cf-9956-b3b897f10b7b>
In [5]: s.get() #獲取返回結果
五、專案模式
#celery_project/
task2.py
celery.py
_pycache__
#celery.py
from __future__ import absolute_import,unicode_literals
from celery import Celery
app = Celery(
"project",
broker="redis://127.0.0.1:6379",
backend="redis://127.0.0.1:6379",
include=["task1","task2"]) #執行多個任務,這裡對應的是當前目錄的task1.py,task2.py
app.conf.update(
result_expires=3600, #任務結果一小時內沒有獲取則過期
)
app.config_from_object('celery_app.celeryconfig') #通過Celery例項載入配置模組
if __name__ == '__main__':
app.start()
#task1.py & task2.py
from __future__ import absolute_import,unicode_literals
from .celery import app
@app.task
def add(x,y):
return x + y
@app.task
def delete(x,y):
return x - y
#執行
celery -A celery_project worker -l info //celery_project為目錄名稱
#測試方法一樣,開啟另外一個終端呼叫task1或task2.py中的函式
celery常用方法
result = task.add.delay(10,20) //呼叫函式和delay方法
result.get() //獲取結果,timeout預設為0
result.ready() //如果函式執行出現阻塞則返回False,可進行判斷!
result.get(propagate=False) // 如果出錯,獲取錯誤結果,不觸發異常
result.traceback //列印異常詳細結果
result.id //任務id
celery multi start name project_name -l info //啟動celery任務並後臺執行
celery multi restart worker -A celery_project //重新啟動celery任務
celery multi stop worker //停止
Celery定時任務
#task1.py
from __future__ import absolute_import,unicode_literals
from .celery import app //從主程式倒入
from celery.schedules import crontab
@app.on_after_configure.connect
def setup_periodic_tasks(sender,**kwargs):
#請注意,這裡的name不能相同!必須唯一,否則執行該方法!
sender.add_periodic_task(5.0,delete.s(10,20),name="add every 10") //每五秒
sender.add_periodic_task(5.0,add.s(2,20),name="add every 10")
sender.add_periodic_task(
crontab(hour=20,minute=15), //每天20點15
add.s(200,5)
)
@app.task
def add(x,y):
return x + y
@app.task
def delete(x,y):
return x - y
#啟動worker
celery -A celery_project worker -l debug
#啟動beat
celery -A celery_project.task1 beat -l debug
基於配置檔案方式定義執行任務
app.conf.beat_schedule = {
'add-every-30-seconds' : {
'task': 'celery_project.task1.add',
'schedule': 5.0,
'args': (100,200)
}
}
app.conf.timezone = 'UTC'
Django & Celery
pip install django-celery //安裝django-celery
#需要和settings.py同級建立celery.py
#settings.py
INSTALL_APP -> djcelery //INSTALL_APP下加入djcelery
import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379' #佇列地址
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379' #儲存結果
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_IMPORTS = ('api.tasks', )
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' #是使用了django-celery預設的資料庫排程模型,任務執行週期都被存在你指定的orm資料庫中
CELERYD_CONCURRENCY = 20 #worker併發數
CELERY_TASK_RESULT_EXPIRES = 1200 #任務結果過期時間
CELERYD_PREFETCH_MULTIPLIER = 4 # celery worker 每次去redis取任務的數量
CELERYD_MAX_TASKS_PER_CHILD = 200 # 每個worker執行了多少任務就會死掉,我建議數量可以大一些,比如200
#celery.py -> 與settings.py同級
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Celery_Project.settings')
app = Celery('Celery_Project')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
#__init__.py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
#{app_name}/tasks.py 可以在每一個app下建立tasks.py檔案進行celery任務編寫
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
#start
python manage.py migrate djcelery //生成資料表
python manage.py runserver //啟動django
python manage.py celery worker -c 6 -l debug //啟動celery