Celery 分散式任務佇列快速入門 Celery 分散式任務佇列快速入門
Celery 分散式任務佇列快速入門
本節內容
Celery介紹和基本使用
在專案中如何使用celery
啟用多個workers
Celery 定時任務
與django結合
通過django配置celery periodic task
一、Celery介紹和基本使用
Celery 是一個 基於python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery, 舉幾個例項場景中可用的例子:
- 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程式等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。
- 你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個簡訊祝福
Celery 在執行任務時需要通過一個訊息中介軟體來接收和傳送任務訊息,以及儲存任務結果, 一般使用rabbitMQ or Redis,後面會講
1.1 Celery有以下優點:
- 簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的
- 高可用:當任務執行失敗或執行過程中發生連線中斷,celery 會自動嘗試重新執行任務
- 快速:一個單程序的celery每分鐘可處理上百萬個任務
- 靈活: 幾乎celery的各個元件都可以被擴充套件及自定製
Celery基本工作流程圖
1.2 Celery安裝使用
Celery的預設broker是RabbitMQ, 僅需配置一行就可以
1 |
broker_url =
'amqp://guest:[email protected]:5672//'
|
rabbitMQ 沒裝的話請裝一下,安裝看這裡 http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3
使用Redis做broker也可以
安裝redis元件
1 |
$ pip install
-
U
"celery[redis]"
|
配置
Configuration is easy, just configure the location of your Redis database:
app.conf.broker_url = 'redis://localhost:6379/0'
Where the URL is in the format of:
redis://:[email protected]:port/db_number
all fields after the scheme are optional, and will default to localhost
on port 6379, using database 0.
如果想獲取每個任務的執行結果,還需要配置一下把任務結果存在哪
If you also want to store the state and return values of tasks in Redis, you should configure these settings:
app.conf.result_backend = 'redis://localhost:6379/0'
1. 3 開始使用Celery啦
安裝celery模組
1 |
$ pip install celery
|
建立一個celery application 用來定義你的任務列表
建立一個任務檔案就叫tasks.py吧
1 2 3 4 5 6 7 8 9 10 |
from
celery
import
Celery
app
=
Celery(
'tasks'
,
broker
=
'redis://localhost'
,
backend
=
'redis://localhost'
)
@app
.task
def
add(x,y):
print
(
"running..."
,x,y)
return
x
+
y
|
啟動Celery Worker來開始監聽並執行任務
1 |
$ celery -A tasks worker --loglevel=info
|
呼叫任務
再開啟一個終端, 進行命令列模式,呼叫任務
1 2 |
>>>
from
tasks
import
add
>>> add.delay(
4
,
4
)
|
看你的worker終端會顯示收到 一個任務,此時你想看任務結果的話,需要在呼叫 任務時 賦值個變數
1 |
>>> result
=
add.delay(
4
,
4
)
|
The ready()
method returns whether the task has finished processing or not:
>>> result.ready()
False
You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:
>>> result.get(timeout=1) 8
In case the task raised an exception, get()
will re-raise the exception, but you can override this by specifying the propagate
argument:
>>> result.get(propagate=False)
If the task raised an exception you can also gain access to the original traceback:
>>> result.traceback
…
二、在專案中如何使用celery
可以把celery配置成一個應用
目錄格式如下
1 2 3 |
proj
/__init__
.py
/celery
.py
/tasks
.py
|
proj/celery.py內容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
from
__future__
import
absolute_import, unicode_literals
from
celery
import
Celery
app
=
Celery(
'proj'
,
broker
=
'amqp://'
,
backend
=
'amqp://'
,
include
=
[
'proj.tasks'
])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires
=
3600
,
)
if
__name__
=
=
'__main__'
:
app.start()
|
proj/tasks.py中的內容
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
啟動worker
1 |
$ celery -A proj worker -l info
|
輸出
1 2 3 4 5 6 7 8 9 10 11 12 13 |
-------------- [email protected]
local
v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x103a020f0
- ** ---------- .> transport: redis:
//localhost
:6379
//
- ** ---------- .> results: redis:
//localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (
enable
-E to monitor tasks
in
this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
|
後臺啟動worker
In the background
In production you’ll want to run the worker in the background, this is described in detail in the daemonization tutorial.
The daemonization scripts uses the celery multi command to start one or more workers in the background:
$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Starting nodes...
> w1.halcyon.local: OK
You can restart it too:
$ celery multi restart w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64024
> Waiting for 1 node..... > w1.halcyon.local: OK > Restarting node w1.halcyon.local: OK celery multi v4.0.0 (latentcall) > Stopping nodes... > w1.halcyon.local: TERM -> 64052
or stop it:
$ celery multi stop w1 -A proj -l info
The stop
command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the stopwait
command instead, this ensures all currently executing tasks is completed before exiting:
$ celery multi stopwait w1 -A proj -l info
三、Celery 定時任務
celery支援定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模組叫celery beat
寫一個指令碼 叫periodic_task.py1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
from
celery
import
Celery
from
celery.schedules
import
crontab
app
=
Celery()
@app
.on_after_configure.connect
def
setup_periodic_tasks(sender,
*
*
kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(
10.0
, test.s(
'hello'
), name
=
'add every 10'
)
# Calls test('world') every 30 seconds
sender.add_periodic_task(
30.0
, test.s(
'world'
), expires
=
10
)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour
=
7
, minute
=
30
, day_of_week
=
1
),
test.s(
'Happy Mondays!'
),
)
@app
.task
def
test(arg):
print
(arg)
|
add_periodic_task 會新增一條定時任務
上面是通過呼叫函式新增定時任務,也可以像寫配置檔案 一樣的形式新增, 下面是每30s執行的任務
1 2 3 4 5 6 7 8 |
app.conf.beat_schedule
=
{
'add-every-30-seconds'
: {
'task'
:
'tasks.add'
,
'schedule'
:
30.0
,
'args'
: (
16
,
16
)
},
}
app.conf.timezone
=
'UTC'
|
任務新增好了,需要讓celery單獨啟動一個程序來定時發起這些任務, 注意, 這裡是發起任務,不是執行,這個程序只會不斷的去檢查你的任務計劃, 每發現有任務需要執行了,就發起一個任務呼叫訊息,交給celery worker去執行
啟動任務排程器 celery beat
1 |
$ celery -A periodic_task beat
|
輸出like below
1 2 3 4 5 6 7 8 9 10 |
celery beat v4.0.2 (latentcall) is starting.
__ - ... __ - _
LocalTime -> 2017-02-08 18:39:31
Configuration ->
. broker -> redis:
//localhost
:6379
//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
|
此時還差一步,就是還需要啟動一個worker,負責執行celery beat發起的任務
啟動celery worker來執行任務
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
$ celery -A periodic_task worker
-------------- [email protected]
local
v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-02-08 18:42:08
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x104d420b8
- ** ---------- .> transport: redis:
//localhost
:6379
//
- ** ---------- .> results: redis:
//localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (
enable
-E to monitor tasks
in
this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
|
好啦,此時觀察worker的輸出,是不是每隔一小會,就會執行一次定時任務呢!
注意:Beat needs to store the last run times of the tasks in a local database file (named celerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:
1 |
$ celery -A periodic_task beat -s
/home/celery/var/run/celerybeat-schedule
|
更復雜的定時配置
上面的定時任務比較簡單,只是每多少s執行一個任務,但如果你想要每週一三五的早上8點給你發郵件怎麼辦呢?哈,其實也簡單,用crontab功能,跟linux自帶的crontab功能是一樣的,可以個性化定製任務執行時間
linux crontab http://www.cnblogs.com/peida/archive/2013/01/08/2850483.html
1 2 3 4 5 6 7 8 9 10 |
from
celery.schedules
import
crontab
app.conf.beat_schedule
=
{
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning'
: {
'task'
:
'tasks.add'
,
'schedule'
: crontab(hour
=
7
, minute
=
30
, day_of_week
=
1
),
'args'
: (
16
,
16
),
},
}
|
上面的這條意思是每週1的早上7.30執行tasks.add任務
還有更多定時配置方式如下:
Example | Meaning |
crontab() |
Execute every minute. |
crontab(minute=0, hour=0) |
Execute daily at midnight. |
crontab(minute=0, hour='*/3') |
Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
|
Same as previous. |
crontab(minute='*/15') |
Execute every 15 minutes. |
crontab(day_of_week='sunday') |
Execute every minute (!) at Sundays. |
|
Same as previous. |
|
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays. |
crontab(minute=0,hour='*/2,*/3') |
Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour='*/5') |
Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5). |
crontab(minute=0, hour='*/3,8-17') |
Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
crontab(0, 0,day_of_month='2') |
Execute on the second day of every month. |
|
Execute on every even numbered day. |
|
Execute on the first and third weeks of the month. |
|
Execute on the eleventh of May every year. |
|
Execute on the first month of every quarter. |
上面能滿足你絕大多數定時任務需求了,甚至還能根據潮起潮落來配置定時任務, 具體看 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#solar-schedules
四、最佳實踐之與django結合
django 可以輕鬆跟celery結合實現非同步任務,只需簡單配置即可
If you have a modern Django project layout like:
- proj/
- proj/__init__.py - proj/settings.py - proj/urls.py - manage.py