1. 程式人生 > 實用技巧 >Python實用模組(二十)Apscheduler

Python實用模組(二十)Apscheduler

Python實用模組(二十)Apscheduler

Python實用模組迷途小書童3個月前 (07-17)0評論

軟硬體環境

視訊看這裡

前言

說起定時任務,第一反應應該是windows自帶的計劃任務或者linux自帶的crontab,關於ubuntu作業系統下如何使用crontab可以參考下面這支視訊。apscheduler是一款使用python語言開發的定時任務工具,提供了非常豐富而且簡單易用的定時任務介面

安裝

安裝非常簡單, 使用pip

pip install apscheduler

apscheduler的四大元件

  • triggers 觸發器 可以按照日期、時間間隔或者contab表示式三種方式觸發
  • job stores 作業儲存器 指定作業存放的位置,預設儲存在記憶體,也可以儲存在各種資料庫中
  • executors 執行器 將指定的作業提交到執行緒池或者程序池中執行
  • schedulers 作業排程器 常用的有BackgroundScheduler(後臺執行)和BlockingScheduler(阻塞式)

程式碼實踐

下面通過幾個示例來看看如何來使用apscheduler

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BlockingScheduler()

    # 間隔設定為1秒,還可以使用minutes、hours、days、weeks等
    intervalTrigger=IntervalTrigger(seconds=1)

    # 給作業設個id,方便作業的後續操作,暫停、取消等
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()
    print('=== end. ===')

執行程式碼,輸出是這樣的

因為我們使用了BlockingScheduler,它是阻塞式的,所以只看到了my_job方法中的輸出,而語句print('=== end. ===')並沒有被執行。BackgroundScheduler它可以在後臺執行,不會阻塞主執行緒的執行,來看下面的程式碼

import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BackgroundScheduler()
    intervalTrigger=IntervalTrigger(seconds=1)
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()
    print('=== end. ===')
    while True:
        time.sleep(1)

程式碼執行的結果是這樣的

如果把triggers設定成DateTrigger,就變成作業在某一個時間點執行,示例如下

import time
import datetime
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.date import DateTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BlockingScheduler()
    intervalTrigger=DateTrigger(run_date='2020-07-17 16:18:55')
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

等到設定的時間到了,作業就會被自行一次

如果想按照指定的週期去執行的話,就需要使用CronTrigger了,工作原理跟UNIXcrontab定時任務非常相似,它可以指定非常詳細且複雜的規則。同樣的來看示例程式碼

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BlockingScheduler()

    # 第一秒執行作業
    intervalTrigger=CronTrigger(second=1)

    # 每天的19:30:01執行作業
    # intervalTrigger=CronTrigger(hour=19, minute=30, second=1)

    # 每年的10月1日19點執行作業
    # intervalTrigger=CronTrigger(month=10, day=1, hour=19)

    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

程式碼執行的效果是這樣的

上面的程式碼中並沒有使用executors,因為只有一個作業,但是從除錯中可以發現,預設情況下,apscheduler也是使用了ThreadPoolExecutor,且執行緒池的大小是10

下面我們看看executors的使用

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":

    executors = {
        'default': ThreadPoolExecutor(20)
    }

    scheduler = BlockingScheduler(executors=executors)

    intervalTrigger=IntervalTrigger(seconds=1)
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

可以看到,我們將執行緒池的大小改為了20,在初始化scheduler的時候將executors傳遞進去,後面的操作就跟之前的完全一樣了

關於ThreadPoolExecutorProcessPoolExecutor的選擇問題,這裡有一個原則,如果是cpu密集型的作業,使用ProcessPoolExecutor,其它的使用ThreadPoolExecutor,當然ThreadPoolExecutorProcessPoolExecutor也是可以混用的

最後我們來看看作業儲存器,我們把它改成儲存到sqlite

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore


def my_job():
    print('my_job, {}'.format(time.ctime()))

jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}


if __name__ == "__main__":

    executors = {
        'default': ThreadPoolExecutor(20)
    }

    scheduler = BlockingScheduler(jobstores=jobstores, executors=executors)

    intervalTrigger=IntervalTrigger(seconds=1)
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

程式碼執行後,會在原始碼目錄下生成sqlite資料庫檔案jobs.sqlite,我們使用圖形化工具開啟檢視,可以看到資料庫中存放的作業的id和作業下次執行的時間

參考資料