任務排程schedule和celery
1. schedule
如果要實現一個小的定時任務指令碼,可以採用schedule這個輕量級定時任務排程庫。
import schedule import time def job(name): print(name,'do something...') # 每十分鐘執行任務 schedule.every(10).minutes.do(job, name) # 每小時執行任務 schedule.every().hour.do(job, name) # 每天定點執行任務 schedule.every().day.at('10:30').do(job, name) # 每週一執行任務 schedule.every().monday.do(job, name) # 每週一定點任務 schedule.every().monday.at('10:30').do(job, name) while True: # 保持任務執行 schedule.run_pending() time.sleep(1)
值得注意的是,如果是多個任務執行,實際上他們是序列執行的。如果上面的任務耗時,會影響下面任務的執行。
對於這種情況,可以使用多執行緒/多程序來解決。
import datetime import schedule import threading import time def job1(): print('this is job1') time.sleep(2) print('job1:',datetime.datetime.now()) def job2(): print('this is job2') time.sleep(2) print('job2:',datetime.datetime.now()) def task1(): threading.Thread(target=job1).start() def task2(): threading.Thread(target=job2).start() def run(): schedule.every(10).seconds.do(task1) schedule.every(10).seconds.do(task2) while True: schedule.run_pending() time.sleep(1)
schedule的使用比較簡單,就是一個死迴圈執行任務,因此定時任務job不應該是死迴圈型別的,這個任務執行緒需要有一個執行完畢的出口,否則會導致無限迴圈問題。另外一點是,定時任務的執行時間如果比schedule的間隔時間長,同樣會造成執行緒堆積問題,引發異常。
2. celery
celery是一個強大的分散式任務佇列,相比schedule更加完備而強大,同時也更加“重”。它可以讓任務的執行完全脫離主程式,甚至是分配到其他主機上執行。
通常使用celery來實現非同步任務和定時任務。其結構組成如下:
可以看到,celery主要包含以下幾個模組:
- 任務模組
包含非同步任務與定時任務。非同步任務通常在業務邏輯中被觸發,並被髮往任務佇列;定時任務由celery beat程序週期性地將任務發往任務佇列。
- 訊息中介軟體broker
broker,即為任務排程佇列,接收任務生產者發來的任務訊息,將任務存入佇列。celery本身不提供任務佇列,推薦使用RabbitMQ和Redis。
- 任務執行單元worker
worker實時監控訊息佇列,獲取排程的任務,並執行。
- 任務結果儲存backend
backend用於儲存任務的執行結果,通訊息中介軟體一樣,儲存可使用RabbitMQ,Redis,MongoDB等。
非同步任務
使用celery實現非同步任務主要包括三個步驟:
- 建立一個celery例項
- 啟動celery worker
- 程式呼叫非同步任務
以下做具體介紹:
- 1.建立celery例項
這裡使用Redis作為broker和backend。
建立檔案task.py
import time
from celery import Celery
# 指定訊息中介軟體用redis
broker = 'redis://127.0.0.1:6379'
# 指定儲存用redis
backend = 'redis://127.0.0.1:6379/0'
# 建立一個celery例項app,名稱為my_task
app = Celery('my_task', broker=broker, backend=backend)
# 建立一個celery任務add,被@app.task裝飾後,成為可被排程的任務
@app.task
def add(x, y):
time.sleep(5) # 模擬耗時操作
return x+y
- 2.啟動celery worker
在當前目錄下,使用如下方式啟動celery worker
celery worker -A task --loglevel=info
其中:
- 引數-A指定了celery例項的位置,這裡是task.py中,celery會自動在該檔案中尋找celery例項物件,當然也可以直接指定為-A task.app;
- 引數--loglevel指定了日誌級別,預設為warning,也可以使用-l info來表示;
- 3.呼叫任務
現在可以使用delay()或者apply_async()方法來呼叫任務。
在當前目錄下,開啟Python控制檯,輸入如下:
我們從task.py中匯入了add任務物件,然後使用delay()方法傳送任務到broker,worker程序監測到該任務後就執行,
這時發現報錯,原因是在Windows系統下使用celery4版本,解決方法是安裝一個eventlet包,然後啟動worker時加一個引數
celery worker -A task -l info -P eventlet
然後就可以正常使用了。
另外如果想獲取執行後的結果,可以這樣做:
上面是在互動環境中呼叫任務,實際上通常在程式用呼叫,建立client.py如下:
from task import add
import time
print('開始時間', time.ctime())
add.delay(2,5)
print('完成時間', time.ctime())
然後執行檔案,結果如下:
可以看出,雖然任務函式需要等待5秒才返回結果,但是由於是一個非同步任務,不會阻塞當前主程式,所以立刻執行了列印完成的語句。
相比直接把broker和backend配置寫入程式程式碼中,更好的方式是增加一個配置檔案,通常命名為`celeryconfig.py`。
__init__.py程式碼如下:
from celery import Celery
# 建立celery例項
app = Celery('demo')
# 通過celery例項載入配置模組
app.config_from_object('celery_app.celeryconfig')
celeryconfig.py程式碼如下:
# 指定broker
BROKER_URL = 'redis://127.0.0.1:6379'
# 指定backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 指定時區
CELERY_TIMEZONE = 'Asia/Shanghai'
# 指定匯入的任務模組
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)
task1.py程式碼如下:
import time
from celery_app import app
@app.task
def add(x,y):
time.sleep(2)
return x+y
task2.py程式碼如下:
import time
from celery_app import app
@app.task
def multiply(x,y):
time.sleep(2)
return x*y
client.py程式碼如下:
import time
from celery_app import task1
from celery_app import task2
print('開始', time.ctime())
task1.add.delay(2,3) # delay是apply_async的快捷方式
task2.multiply.apply_async(args=[2,3])
print('完成', time.ctime())
現在可以啟動worker程序
然後執行python命令執行client.py檔案。
在worker視窗,我們可以看到任務的執行
定時任務
celery beat程序通過讀取配置檔案的內容,週期性地將定時任務發往任務佇列。
除了celerconfig.py內容增加了定時排程內容,其他模組和非同步任務相同。
celerconfig.py程式碼如下:
from datetime import timedelta
from celery.schedules import crontab
# 指定broker
BROKER_URL = 'redis://127.0.0.1:6379'
# 指定backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 指定時區
CELERY_TIMEZONE = 'Asia/Shanghai'
# 指定匯入的任務模組
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)
# 定時排程schedules
CELERYBEAT_SCHEDULE={
'add-every-30-seconds':{
'task':'celery_app.task1.add',
'schedule':timedelta(seconds=30), # 每30秒執行一次
'args':(2,3) # 任務函式引數
},
'multiply-every-30-seconds':{
'task':'celery_app.task2.multiply',
'schedule':crontab(hour=14,minute=30), # 每天下午2點30分執行一次
'args':(2,3) # 任務函式引數
}
}
現在,啟動worker程序,然後啟動beat程序,定時任務將被髮送到broker
之後在worker視窗,可以看到task1每30秒執行一次,task2則定點執行一次。
為了簡化,也可以將啟動worker程序和beat程序放在一條命令中:
celery -B -A celery_app worker -l info -P eventlet