Python實用模組(二十)Apscheduler
Python實用模組(二十)Apscheduler
Python實用模組迷途小書童3個月前 (07-17)0評論軟硬體環境
- windows 10 64bits
- anaconda withpython3.7
- apscheduler3.6.3
視訊看這裡
前言
說起定時任務,第一反應應該是windows
自帶的計劃任務或者linux
自帶的crontab
,關於ubuntu
作業系統下如何使用crontab
可以參考下面這支視訊。apscheduler
是一款使用python
語言開發的定時任務工具,提供了非常豐富而且簡單易用的定時任務介面
安裝
安裝非常簡單, 使用pip
pip install apscheduler
apscheduler的四大元件
- triggers 觸發器 可以按照日期、時間間隔或者
contab
表示式三種方式觸發 - job stores 作業儲存器 指定作業存放的位置,預設儲存在記憶體,也可以儲存在各種資料庫中
- executors 執行器 將指定的作業提交到執行緒池或者程序池中執行
- schedulers 作業排程器 常用的有
BackgroundScheduler
(後臺執行)和BlockingScheduler
(阻塞式)
程式碼實踐
下面通過幾個示例來看看如何來使用apscheduler
import time from apscheduler.schedulers.background import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger def my_job(): print('my_job, {}'.format(time.ctime())) if __name__ == "__main__": scheduler = BlockingScheduler() # 間隔設定為1秒,還可以使用minutes、hours、days、weeks等 intervalTrigger=IntervalTrigger(seconds=1) # 給作業設個id,方便作業的後續操作,暫停、取消等 scheduler.add_job(my_job, intervalTrigger, id='my_job_id') scheduler.start() print('=== end. ===')
執行程式碼,輸出是這樣的
因為我們使用了BlockingScheduler
,它是阻塞式的,所以只看到了my_job
方法中的輸出,而語句print('=== end. ===')
並沒有被執行。BackgroundScheduler
它可以在後臺執行,不會阻塞主執行緒的執行,來看下面的程式碼
import time from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger def my_job(): print('my_job, {}'.format(time.ctime())) if __name__ == "__main__": scheduler = BackgroundScheduler() intervalTrigger=IntervalTrigger(seconds=1) scheduler.add_job(my_job, intervalTrigger, id='my_job_id') scheduler.start() print('=== end. ===') while True: time.sleep(1)
程式碼執行的結果是這樣的
如果把triggers
設定成DateTrigger
,就變成作業在某一個時間點執行,示例如下
import time
import datetime
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.date import DateTrigger
def my_job():
print('my_job, {}'.format(time.ctime()))
if __name__ == "__main__":
scheduler = BlockingScheduler()
intervalTrigger=DateTrigger(run_date='2020-07-17 16:18:55')
scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
scheduler.start()
等到設定的時間到了,作業就會被自行一次
如果想按照指定的週期去執行的話,就需要使用CronTrigger
了,工作原理跟UNIX
中crontab
定時任務非常相似,它可以指定非常詳細且複雜的規則。同樣的來看示例程式碼
import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
def my_job():
print('my_job, {}'.format(time.ctime()))
if __name__ == "__main__":
scheduler = BlockingScheduler()
# 第一秒執行作業
intervalTrigger=CronTrigger(second=1)
# 每天的19:30:01執行作業
# intervalTrigger=CronTrigger(hour=19, minute=30, second=1)
# 每年的10月1日19點執行作業
# intervalTrigger=CronTrigger(month=10, day=1, hour=19)
scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
scheduler.start()
程式碼執行的效果是這樣的
上面的程式碼中並沒有使用executors
,因為只有一個作業,但是從除錯中可以發現,預設情況下,apscheduler
也是使用了ThreadPoolExecutor
,且執行緒池的大小是10
下面我們看看executors
的使用
import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
def my_job():
print('my_job, {}'.format(time.ctime()))
if __name__ == "__main__":
executors = {
'default': ThreadPoolExecutor(20)
}
scheduler = BlockingScheduler(executors=executors)
intervalTrigger=IntervalTrigger(seconds=1)
scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
scheduler.start()
可以看到,我們將執行緒池的大小改為了20,在初始化scheduler
的時候將executors
傳遞進去,後面的操作就跟之前的完全一樣了
關於ThreadPoolExecutor
和ProcessPoolExecutor
的選擇問題,這裡有一個原則,如果是cpu
密集型的作業,使用ProcessPoolExecutor
,其它的使用ThreadPoolExecutor
,當然ThreadPoolExecutor
和ProcessPoolExecutor
也是可以混用的
最後我們來看看作業儲存器,我們把它改成儲存到sqlite
中
import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
def my_job():
print('my_job, {}'.format(time.ctime()))
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
if __name__ == "__main__":
executors = {
'default': ThreadPoolExecutor(20)
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors)
intervalTrigger=IntervalTrigger(seconds=1)
scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
scheduler.start()
程式碼執行後,會在原始碼目錄下生成sqlite
資料庫檔案jobs.sqlite
,我們使用圖形化工具開啟檢視,可以看到資料庫中存放的作業的id
和作業下次執行的時間