Celery分布式任務
Celery分布式任務
celery call把任務給一個組件,組件交給rabiitmq放到隊列broker,隊列返回任務id給celery組件再給call,
任務完成時call拿著id通過celery去rabbitmq取。broker發任務給worker
1.Celery有以下優點:
簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的
高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
快速:一個單進程的celery每分鐘可處理上百萬個任務
靈活: 幾乎celery的各個組件都可以被擴展及自定制
2.Celery安裝使用
Celery的默認broker是RabbitMQ, 僅需配置一行就可以
broker_url = ‘amqp://guest:guest@localhost:5672//‘
使用Redis做broker也可以
安裝redis組件
pip install -U "celery[redis]"
app.conf.broker_url = ‘redis://localhost:6379/0‘ redis://:password@hostname:port/db_number
配置一下把任務結果存在哪:app.conf.result_backend = ‘redis://localhost:6379/0‘
3.開始使用Celery啦
pip install celery
軟鏈:ln -s /usr/local/python3/bin/celery /usr/bin/celery
移除軟鏈:rm + name
4.簡單任務
創建一個任務文件
from celery import Celery
app = Celery(‘tasks‘, # app的名字
broker=‘redis://:[email protected]:6379/0‘)
# backend=‘redis://localhost‘)
@app.task
def add(x, y): # 加裝飾器,這是worker可以執行的一個任務
print("running...", x, y)
return x + y
啟動Celery Worker來開始監聽並執行任務
celery -A tasks worker --loglevel=info
註:window上報錯,pip install eventlet celery -A <mymodule> worker -l info -P eventlet
調用任務
再打開一個終端, 進行命令行模式,調用任務
from tasks import add
add.delay(4, 4)
result = add.delay(4, 4) #賦值變量後能接收值
發布任務後,多個worker時只能由其中一個搶到並執行,linux和window的worker一樣
任務狀態:
result.ready()
取結果時,設置超時時間
result.get(timeout=1) #如果還沒處理完,會報錯
propagate 參數覆蓋get的報錯,而是返回任務的報錯
result.get(propagate=False)
回撤
result.traceback
二、在項目中使用celery
1.目錄格式
proj/__init__.py
/celery.py #配置連接
/tasks.py
2. proj/celery.py內容
from __future__ import absolute_import, unicode_literals
# 默認導入當前同名文件,這裏導入後從python包的絕對路徑導入,支持unicode兼容
from celery import Celery
app = Celery(‘proj‘,
broker=‘redis://:[email protected]:6379/0‘,
backend=‘redis://:[email protected]:6379/0‘,
include=[‘celery_project.tasks‘]) # 要執行的任務文件模板位置
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == ‘__main__‘:
app.start()
3. proj/tasks.py中的內容
from __future__ import absolute_import, unicode_literals
from .celery import app # .表示不絕對導入
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
4.啟動celery的worker
elery -A celery_project worker -l info -P eventlet
5.發布任務
進入項目所在文件夾,cmd導入運行
# 這時不能從文件導入函數,只能在上一層文件夾導入文件,相對路徑.xxx不能作為函數主入口
from celery_project import tasks
t = tasks.add.delay(2,3)
t.get()
6.後臺進程(僅linux)
啟動
celery multi start w1 -A proj -l info
停止
celery multi stop w1 -A proj -l info
celery multi stopwait w1 -A proj -l info # 等待任務結束才停止
查看進程
ps -ef |grep celery
三、celery定時任務
celery支持定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模塊叫celery beat
celery beat會循環監聽定時任務,負責交給worker
1.編寫定時任務,加入app的include
from __future__ import absolute_import, unicode_literals
from .celery import app # .表示不絕對導入
from celery.schedules import crontab
@app.on_after_configure.connect # 連接後啟動任務
def setup_periodic_tasks(sender, **kwargs):
# Calls test(‘hello‘) every 10 seconds.
sender.add_periodic_task(10.0, test.s(‘hello‘), name=‘add every 10‘) # 每隔10秒執行,.s相當於delay傳參
# Calls test(‘world‘) every 30 seconds
sender.add_periodic_task(30.0, test.s(‘world‘), expires=10) # 結果保存10秒
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1), # crontab是linux的定時模塊 crontab -e 編寫定時文件 分鐘 小時 日 月 星期幾 command
test.s(‘Happy Mondays!‘),
)
@app.task
def test(arg):
print(‘run fun‘,arg)
2.啟動celery beat
celery -A celery_project.periodic beat -l info # 在根目錄執行,或者理解為任務的命名
# beat就會根據定時時間wake up來把任務交給worker
# 指定beat最後運行時間要存儲在哪個位置,默認當前目錄
celery -A celery_project.periodic beat -s /home/celery/var/run/celerybeat-schedule
配置文件添加定時任務
app.conf.beat_schedule = {
‘add-every-30-seconds‘: {
‘task‘: ‘celery_project.tasks.add‘, # 一個app下該任務的命名
‘schedule‘: 30.0,
‘args‘: (16, 16)
},
}
app.conf.timezone = ‘UTC‘
更復雜的定時配置
用crontab功能,跟linux自帶的crontab功能是一樣的,可以個性化定制任務執行時間
from celery.schedules import crontab
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
‘add-every-monday-morning‘: {
‘task‘: ‘celery_project.tasks.add‘,
‘schedule‘: crontab(hour=7, minute=30, day_of_week=1),
‘args‘: (16, 16),
},
}
四、與django結合使用
django 可以輕松跟celery結合實現異步任務,只需簡單配置即可
1.定義celery項目實例
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the ‘celery‘ program.
os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘MyBlog.settings‘)
app = Celery(‘task‘) # 項目名
# Using a string here means the worker don‘t have to serialize
# the configuration object to child processes.
# - namespace=‘CELERY‘ means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object(‘django.conf:settings‘, namespace=‘CELERY‘) # 使用django的session
# Load task modules from all registered Django app configs.
app.autodiscover_tasks() # 項目下所有app的celery任務發現
@app.task(bind=True)
def debug_task(self):
print(‘Request: {0!r}‘.format(self.request))
2.在django的settings配置celery的連接
#for celery
CELERY_BROKER_URL = ‘redis://:[email protected]:6379/0‘
CELERY_RESULT_BACKEND = ‘redis://:[email protected]:6379/0‘
3.設置celery隨django項目一起啟動
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = [‘celery_app‘]
4.具體的app裏在tasks.py寫要執行的任務,以被worker執行
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task # 多個app共享worker
@shared_task
def add(x,y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
5.在app裏寫django的views,然後調用上面寫好的celery task,這樣訪問頁面時自動發布任務
from django.http import HttpResponse
from celery.result import AsyncResult
from .celery_task import add
task_id=None
def celery_pub(request):
task = add.delay(22,23)
global task_id
task_id = task.id
# 拿到任務id即可返回,不用等待get獲取值,以後再調用id拿值
return HttpResponse(task.id)
def celery_get(request):
global task_id
result = AsyncResult(id=task_id)
return HttpResponse(result.get())
6.進入Django根目錄,通過cmd啟動worker以接收並執行任務
celery -A 項目名 worker -l info
# 訪問頁面拿到id,再通過另一個請求拿到結果。
五、Django定時任務
1.添加django_celery_beat模塊到django的INSTALLED_APPS
INSTALLED_APPS = (
...,
‘django_celery_beat‘,
)
2.創建數據庫中的表格
python manage.py migrate
3.在admin中添加定時任務(已註冊的任務)到數據庫
4.啟動beat去數據庫取任務
celery -A MyBlog beat -l info -S django
5.啟動worker即可
Celery分布式任務