Celery學習筆記(一)
開始使用Celery
使用celery包含三個方面:1. 定義任務函式。2. 執行celery服務。3. 客戶應用程式的呼叫
1.使用 python 虛擬環境 模擬兩個不同的 主機。
2.建立一個檔案 tasks.py輸入下列程式碼:
from celery import Celery broker = 'redis://127.0.0.1:6379/5' backend = 'redis://127.0.0.1:6379/6' app = Celery('tasks', broker=broker, backend=backend) @app.task def add(x, y): return x + y
上述程式碼匯入了celery,然後建立了celery 例項 app,例項化的過程中指定了任務名tasks(和檔名一致),傳入了broker和backend。然後建立了一個任務函式add。
3.啟動celery服務。在當前命令列終端執行(分別在 env1 和 env2 下執行):
celery -A tasks worker --loglevel=info
此時可以看到輸出,其中包括註冊的任務。
客戶端互動式除錯
在當前tasks.py檔案下進入python3環境
In [0]:from tasks import add In [1]: r = add.delay(2, 2) In [2]: add.delay(2, 2) Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d> In [3]: r = add.delay(3, 3) In [4]: r.re r.ready r.result r.revoke In [4]: r.ready() Out[4]: True In [6]: r.result Out[6]: 6 In [7]: r.get() Out[7]: 6
呼叫 delay 函式即可啟動 add 這個任務。這個函式的效果是傳送一條訊息到broker中去,這個訊息包括要執行的函式、函式的引數以及其他資訊,具體的可以看 Celery官方文件。這個時候 worker 會等待 broker 中的訊息,一旦收到訊息就會立刻執行訊息。 現在是在python環境中呼叫的add函式,實際上通常在應用程式中呼叫這個方法。
注意:如果把返回值賦值給一個變數,那麼原來的應用程式也會被阻塞,需要等待非同步任務返回的結果。因此,實際使用中,不需要把結果賦值。
啟動了一個任務之後,可以看到之前啟動的worker已經開始執行任務了。
使用配置檔案
Celery 的配置比較多,可以在
上述的使用是簡單的配置,下面介紹一個更健壯的方式來使用celery。首先建立一個python包,celery服務,姑且命名為proj。目錄檔案如下:
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',include=['proj.tasks'])
app.config_from_object('proj.config')
if __name__ == '__main__':
app.start()
此時並沒有指定broker和backend。
# -*- coding:utf-8 -*-
from __future__ import absolute_import
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x,y):
return x + y
使用方法:在 proj 的同一級目錄執行 celery:
celery -A proj worker -l info
客戶端使用任務直接呼叫tasks檔案的函式即可
任務排程
新建一個proj1的python包
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from celery import Celery
app = Celery('proj1',include=['proj1.tasks'])
app.config_from_object('proj1.celeryconfig')
if __name__ == '__main__':
app.start()
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
@app.task
def taskA(x,y):
return x + y
@app.task
def taskB(x,y,z):
return x + y + z
@app.task
def taskC(x,y):
return x + y
上面的tasks.py中,首先定義了一個Celery物件,然後用celeryconfig.py對celery物件進行設定,之後再分別定義了三個task,分別是taskA,taskB和taskC。接下來看一下 celeryconfig.py 檔案
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from kombu import Exchange, Queue
BROKER_URL = "redis://192.168.59.129:6379/0"
CELERY_RESULT_BACKEND = "redis://192.168.59.129:6379/0"
# 任務佇列分配
CELERY_QUEUES = (
Queue('default',Exchange("default"),routing_key="default"),
Queue('for_task_A',Exchange('for_task_A'),routing_key='task_a'),
Queue('for_task_B',Exchange('for_task_B'),routing_key='task_b')
)
CELERY_ROUTES = {
'tasks.taskA': {"queue": "for_task_A", "routing_key": "task_a"},
'tasks.taskB': {"queue": "for_task_B", "routing_key": "task_b"}
}
在 celeryconfig.py 檔案中,首先設定了brokel以及result_backend,接下來定義了三個Message Queue,並且指明瞭Queue對應的Exchange(當使用Redis作為broker時,Exchange的名字必須和Queue的名字一樣)以及routing_key的值。
現在在一臺主機上面啟動一個worker,這個worker只執行for_task_A佇列中的訊息,這是通過在啟動worker時使用-Q Queue_Name引數指定的。
celery -A tasks worker -l info -n worker.%h -Q for_task_A
然後在另一臺主機中,切換路徑到工程目錄下,執行以下程式碼來啟動taskA:
from tasks improt *
task_A_re = taskA.delay(100,200)
此時task_A訊息會立馬傳送到for_task_A佇列中。剛開啟任務A的主機立即執行任務。
重複上面的過程,在另外一臺機器上啟動一個worker專門執行for_task_B中的任務。修改上一步驟的程式碼,把 taskA 改成 taskB 並執行。 而在 celeryconfig.py 檔案中還定義了個預設路由default,沒有指定任務route到哪個queue中執行或者指定的route沒配置有,執行任務的時候都會自動分配到default佇列中。
Scheduler(定時任務,週期性任務)
現實中我們經常會遇到定時任務的需求。而在celery中實現定時任務也很簡單。只需要設定celery物件的CELERYBEAT_SCHEDULE屬性即可。
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from kombu import Exchange, Queue
BROKER_URL = "redis://192.168.59.129:6379/0"
CELERY_RESULT_BACKEND = "redis://192.168.59.129:6379/0"
# 任務佇列分配
CELERY_QUEUES = (
Queue('default',Exchange("default"),routing_key="default"),
Queue('for_task_A',Exchange('for_task_A'),routing_key='task_a'),
Queue('for_task_B',Exchange('for_task_B'),routing_key='task_b')
)
CELERY_ROUTES = {
'tasks.taskA': {"queue": "for_task_A", "routing_key": "task_a"},
'tasks.taskB': {"queue": "for_task_B", "routing_key": "task_b"}
}
# Scheduler (定時任務,週期性任務)
CELERY_TIMEZONE = 'Asia/Shanghai'
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'taskA_schedule':{
'task': 'proj1.tasks.taskA',
'schedule': timedelta(seconds=5),
'args' : (100,100)
},
'taskB_schedule':{
'task': 'proj1.tasks.taskB',
'schedule': timedelta(seconds=10),
'args' : (100,200,300)
},
'taskC_schedule':{
'task': 'proj1.tasks.taskC',
'schedule': timedelta(seconds=15),
'args' : (200,200)
}
}
定義了3個定時任務,即每隔5s執行taskA任務,引數為(100,100),每隔10s執行taskB任務,引數為(100,200,300),每隔15s執行taskC任務,引數為(200,200).切換到工程同級目錄下,通過下列命令啟動定時任務: celery -A proj worker -B -l info。使用 beat 引數即可啟動定時任務。
crontab定時
celery也能通過crontab模式來設定定時。修改celeryconfig檔案
# from datetime import timedelta
#
# CELERYBEAT_SCHEDULE = {
# 'taskA_schedule':{
# 'task': 'proj1.tasks.taskA',
# 'schedule': timedelta(seconds=0.5),
# 'args' : (100,100)
# },
# 'taskB_schedule':{
# 'task': 'proj1.tasks.taskB',
# 'schedule': timedelta(seconds=0.5),
# 'args' : (100,200,300)
# },
# 'taskC_schedule':{
# 'task': 'proj1.tasks.taskC',
# 'schedule': timedelta(seconds=0.5),
# 'args' : (200,200)
# }
# }
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 A.M
'add-every-monday-morning': {
'task': 'tasks.tasksA',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}