1. 程式人生 > 程式設計 >python 定時任務框架APScheduler

python 定時任務框架APScheduler

1、 簡介

APScheduler的全稱是Advanced Python Scheduler。它是一個輕量級的 Python 定時任務排程框架。APScheduler 支援三種排程任務:固定時間間隔,固定時間點(日期),Linux 下的 Crontab 命令。同時,它還支援非同步執行、後臺執行排程任務。

2、 安裝

使用 pip 包管理工具安裝 APScheduler 是最方便快捷的。

pip install APScheduler

3 、使用步驟

APScheduler 使用起來還算是比較簡單。執行一個排程任務只需要以下三部曲。

新建一個 schedulers (排程器) 。 新增一個排程任務(job stores)。 執行排程任務。

下面是執行定時任務的簡單示例程式碼

#!/usr/bin/env python
# coding:utf-8

import time
from apscheduler.schedulers.blocking import BlockingScheduler
import logging

LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(filename='mypython.log',level=logging.INFO,format=LOG_FORMAT)

today = time.strftime('%Y%m%d'
,time.localtime(time.time())) nowDateTime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) global num num = 0 class myAP: def __init__(self): print("hello") def sayHello(): global num num = num + 1 print(nowDateTime+' 任務一 hello word %d' % num) def sayHello11(): global num num = num + 1 print
(nowDateTime+' 任務二 o(* ̄︶ ̄*)o hello word %d' % num) if __name__ == "__main__": # BlockingScheduler scheduler = BlockingScheduler() # scheduler.add_job(sayHello,'cron',day_of_week='0-6',hour=19,minute=50) scheduler.add_job(sayHello,'interval',seconds=2) scheduler.add_job(sayHello11,seconds=3) scheduler.start() 複製程式碼

執行結果如圖

定時任務執行結果

4、 基礎元件

APScheduler 有四種元件,分別是:排程器(scheduler),作業儲存(job store),觸發器(trigger),執行器(executor)。

schedulers(排程器) 它是任務排程器,屬於控制器角色。它配置作業儲存器和執行器可以在排程器中完成,例如新增、修改和移除作業。

triggers(觸發器) 描述排程任務被觸發的條件。不過觸發器完全是無狀態的。

job stores(作業儲存器) 任務持久化倉庫,預設儲存任務在記憶體中,也可將任務儲存都各種資料庫中,任務中的資料序列化後儲存到持久化資料庫,從資料庫載入後又反序列化。

executors(執行器) 負責處理作業的執行,它們通常通過在作業中提交指定的可呼叫物件到一個執行緒或者進城池來進行。當作業完成時,執行器將會通知排程器。

4.1 schedulers(排程器)

我個人覺得 APScheduler 非常好用的原因。它提供 7 種排程器,能夠滿足我們各種場景的需要。例如:後臺執行某個操作,非同步執行操作等。排程器分別是:

BlockingScheduler : 排程器在當前程式的主執行緒中執行,也就是會阻塞當前執行緒。 BackgroundScheduler : 排程器在後臺執行緒中執行,不會阻塞當前執行緒。 AsyncIOScheduler : 結合 asyncio 模組(一個非同步框架)一起使用。 GeventScheduler : 程式中使用 gevent(高效能的Python併發框架)作為IO模型,和 GeventExecutor 配合使用。 TornadoScheduler : 程式中使用 Tornado(一個web框架)的IO模型,用 ioloop.add_timeout 完成定時喚醒。 TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定時喚醒。 QtScheduler : 你的應用是一個 Qt 應用,需使用QTimer完成定時喚醒。

4.2 triggers(觸發器)

APScheduler 有三種內建的 trigger: 1)date 觸發器 date 是最基本的一種排程,作業任務只會執行一次。它表示特定的時間點觸發。它的引數如下:

引數 說明
run_date (datetime 或 str) 作業的執行日期或時間
timezone (datetime.tzinfo 或 str) 指定時區

date 觸發器使用示例如下:

from datetime import datetime
from datetime import date
from apscheduler.schedulers.background import BackgroundScheduler

def job_func(text):
    print(text)

scheduler = BackgroundScheduler()
# 在 2017-12-13 時刻執行一次 job_func 方法
scheduler .add_job(job_func,'date',run_date=date(2017,12,13),args=['text'])
# 在 2017-12-13 14:00:00 時刻執行一次 job_func 方法
scheduler .add_job(job_func,run_date=datetime(2017,13,14,0),args=['text'])
# 在 2017-12-13 14:00:01 時刻執行一次 job_func 方法
scheduler .add_job(job_func,run_date='2017-12-13 14:00:01',args=['text'])
scheduler.start()

複製程式碼
interval 觸發器

固定時間間隔觸發。interval 間隔排程,引數如下:

引數 說明
weeks (int) 間隔幾周
days (int) 間隔幾天
hours (int) 間隔幾小時
minutes (int) 間隔幾分鐘
seconds (int) 間隔多少秒
start_date (datetime 或 str) 開始日期
end_date (datetime 或 str) 結束日期
timezone (datetime.tzinfo 或str) 時區
cron 觸發器

在特定時間週期性地觸發,和Linux crontab格式相容。它是功能最強大的觸發器。 我們先了解 cron 引數:

引數 說明
year (int 或 str) 年,4位數字
month (int 或 str) 月 (範圍1-12)
day (int 或 str) 日 (範圍1-31
week (int 或 str) 周 (範圍1-53)
day_of_week (int 或 str) 周內第幾天或者星期幾 (範圍0-6 或者mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 時 (範圍0-23)
minute (int 或 str) 分 (範圍0-59)
second (int 或 str) 秒 (範圍0-59)
start_date (datetime 或 str) 最早開始日期(包含)
end_date (datetime 或 str) 最晚結束時間(包含)
timezone (datetime.tzinfo 或str) 指定時區

cron 觸發器使用示例如下:

import datetime
from apscheduler.schedulers.background import BackgroundScheduler

def job_func(text): print("當前時間:",datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
scheduler = BackgroundScheduler() 

# 在每年 1-3、7-9 月份中的每個星期一、二中的 00:00,01:00,02:00 和 03:00 執行 job_func 任務
scheduler .add_job(job_func,'cron',month='1-3,7-9',day='0,tue',hour='0-3') 
scheduler.start()

複製程式碼

4.3 作業儲存(job store)

該元件是對排程任務的管理。 1)新增 job 有兩種新增方法,其中一種上述程式碼用到的 add_job(), 另一種則是scheduled_job()修飾器來修飾函式。

這個兩種辦法的區別是:第一種方法返回一個 apscheduler.job.Job 的例項,可以用來改變或者移除 job。第二種方法只適用於應用執行期間不會改變的 job。

第二種新增任務方式的例子:

@scheduler.scheduled_job(job_func,minutes=2) 
def job_func(text): 
    print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) 
    scheduler = BackgroundScheduler() 
    scheduler.start()

複製程式碼

2)移除 job 移除 job 也有兩種方法:remove_job() 和 job.remove()。 remove_job() 是根據 job 的 id 來移除,所以要在 job 建立的時候指定一個 id。 job.remove() 則是對 job 執行 remove 方法即可

scheduler.add_job(job_func,minutes=2,id='job_one')
scheduler.remove_job(job_one)
job = add_job(job_func,id='job_one')
job.remvoe()
複製程式碼

3)獲取 job 列表 通過 scheduler.get_jobs() 方法能夠獲取當前排程器中的所有 job 的列表

修改 job 如果你因計劃改變要對 job 進行修改,可以使用Job.modify() 或者 modify_job()方法來修改 job 的屬性。但是值得注意的是,job 的 id 是無法被修改的。

scheduler.add_job(job_func,id='job_one')
scheduler.start()
# 將觸發時間間隔修改成 5分鐘
scheduler.modify_job('job_one',minutes=5)

job = scheduler.add_job(job_func,minutes=2)
# 將觸發時間間隔修改成 5分鐘
job.modify(minutes=5)
複製程式碼

5)關閉 job 預設情況下排程器會等待所有正在執行的作業完成後,關閉所有的排程器和作業儲存。如果你不想等待,可以將 wait 選項設定為 False。

scheduler.shutdown()
scheduler.shutdown(wait=false)
複製程式碼

4.4 執行器(executor)

執行器顧名思義是執行排程任務的模組。最常用的 executor 有兩種:ProcessPoolExecutor 和 ThreadPoolExecutor

下面是顯式設定 job store(使用mongo儲存)和 executor 的程式碼的示例。

from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor


def my_job():
    print 'hello world'
host = '127.0.0.1'
port = 27017
client = MongoClient(host,port)

jobstores = {
    'mongo': MongoDBJobStore(collection='job',database='test',client=client),'default': MemoryJobStore()
}
executors = {
    'default': ThreadPoolExecutor(10),'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
    'coalesce': False,'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults)
scheduler.add_job(my_job,seconds=5)

try:
    scheduler.start()
except SystemExit:
    client.close()

複製程式碼