1. 程式人生 > 其它 >【python】APScheduler(Python化的Cron)使用總結 定時任務 APScheduler(Python化的Cron)使用總結 定時任務

【python】APScheduler(Python化的Cron)使用總結 定時任務 APScheduler(Python化的Cron)使用總結 定時任務

原文連結: https://www.cnblogs.com/zhaoyingjie/p/9664081.html

APScheduler(Python化的Cron)使用總結 定時任務

APScheduler(Python化的Cron)使用總結

簡介

APScheduler全程為Advanced Python Scheduler,是一款輕量級的Python任務排程框架。它允許你像Cron那樣安排定期執行的任務,並且支援Python函式或任意可呼叫的物件。官方文件:https://apscheduler.readthedocs.io/en/latest/userguide.html#basic-concepts

APScheduler安裝

方法一:使用pip安裝

$ pip install apscheduler 
方法二:如果pip不起作用,可以從pypi上下載最新的原始碼包(https://pypi.python.org/pypi/APScheduler/)進行安裝:

$ python setup.py install 
APScheduler元件

triggers(觸發器): 觸發器中包含排程邏輯,每個作業都由自己的觸發器來決定下次執行時間。除了他們自己初始配置意外,觸發器完全是無狀態的。

job stores(作業儲存器):儲存被排程的作業,預設的作業儲存器只是簡單地把作業儲存在記憶體中,其他的作業儲存器則是將作業儲存在資料庫中。當作業被儲存到一個持久化的作業儲存器中的時候,該作業的資料會被序列化,並在載入時被反序列化。作業儲存器不能共享排程器。

executors(執行器):處理作業的執行,他們通常通過在作業中提交指定的可呼叫物件到一個執行緒或者進城池來進行。當作業完成時,執行器將會通知排程器。

schedulers(排程器):配置作業儲存器和執行器可以在排程器中完成,例如新增、修改和移除作業。根據不同的應用場景可以選用不同的排程器,可選的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7種。

排程器

BlockingScheduler : 當排程器是你應用中唯一要執行的東西時。 
BackgroundScheduler : 當你沒有執行任何其他框架並希望排程器在你應用的後臺執行時使用(充電樁即使用此種方式)。 
AsyncIOScheduler : 當你的程式使用了asyncio(一個非同步框架)的時候使用。 
GeventScheduler : 當你的程式使用了gevent(高效能的Python併發框架)的時候使用。 
TornadoScheduler : 當你的程式基於Tornado(一個web框架)的時候使用。 
TwistedScheduler : 當你的程式使用了Twisted(一個非同步框架)的時候使用 
QtScheduler : 如果你的應用是一個Qt應用的時候可以使用。 
作業儲存器

如果你的應用在每次啟動的時候都會重新建立作業,那麼使用預設的作業儲存器(MemoryJobStore)即可,但是如果你需要在排程器重啟或者應用程式奔潰的情況下任然保留作業,你應該根據你的應用環境來選擇具體的作業儲存器。例如:使用Mongo或者SQLAlchemy JobStore (用於支援大多數RDBMS)

執行器

對執行器的選擇取決於你使用上面哪些框架,大多數情況下,使用預設的ThreadPoolExecutor已經能夠滿足需求。如果你的應用涉及到CPU密集型操作,你可以考慮使用ProcessPoolExecutor來使用更多的CPU核心。你也可以同時使用兩者,將ProcessPoolExecutor作為第二執行器。

觸發器

當你排程作業的時候,你需要為這個作業選擇一個觸發器,用來描述這個作業何時被觸發,APScheduler有三種內建的觸發器型別:

date 一次性指定日期 
interval 在某個時間範圍內間隔多長時間執行一次 
cron 和Linux crontab格式相容,最為強大 
date

最基本的一種排程,作業只會執行一次。它的引數如下:

run_date (datetime|str) – 作業的執行日期或時間 
timezone (datetime.tzinfo|str) – 指定時區 
舉個栗子:

# 2016-12-12執行一次job_function
sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text'])
# 2016-12-12 12:00:00執行一次job_function
sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])
interval

 

間隔排程,引數如下:

weeks (int) – 間隔幾周 
days (int) – 間隔幾天 
hours (int) – 間隔幾小時 
minutes (int) – 間隔幾分鐘 
seconds (int) – 間隔多少秒 
start_date (datetime|str) – 開始日期 
end_date (datetime|str) – 結束日期 
timezone (datetime.tzinfo|str) – 時區 
舉個栗子:

 

# 每兩個小時調一下job_function
sched.add_job(job_function, 'interval', hours=2)
cron

 

引數如下:

year (int|str) – 年,4位數字 
month (int|str) – 月 (範圍1-12) 
day (int|str) – 日 (範圍1-31) 
week (int|str) – 周 (範圍1-53) 
day_of_week (int|str) – 周內第幾天或者星期幾 (範圍0-6 或者 mon,tue,wed,thu,fri,sat,sun) 
hour (int|str) – 時 (範圍0-23) 
minute (int|str) – 分 (範圍0-59) 
second (int|str) – 秒 (範圍0-59) 
start_date (datetime|str) – 最早開始日期(包含) 
end_date (datetime|str) – 最晚結束時間(包含) 
timezone (datetime.tzinfo|str) – 指定時區 
舉個栗子:

 

# job_function將會在6,7,8,11,12月的第3個週五的1,2,3點執行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 截止到2016-12-30 00:00:00,每週一到週五早上五點半執行job_function
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')

 

配置排程程式

在應用程式中使用預設作業儲存和預設執行程式執行BackgroundScheduler的例子:

 

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
    # Initialize the rest of the application here, or before the scheduler initialization

 

這將生成一個名為“default”的MemoryJobStore和名為“default”的ThreadPoolExecutor的BackgroundScheduler,預設最大執行緒數為10。

如果不滿足於當前配置,如希望使用兩個執行器有兩個作業儲存器,並且還想要調整新作業的預設值並設定不同的時區,可按如下配置:

 

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

 # 配置作業儲存器
jobstores = {      
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 配置執行器,並設定執行緒數
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,     # 預設情況下關閉新的作業
    'max_instances': 3     # 設定排程程式將同時執行的特定作業的最大例項數3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

 

 

啟動排程器

啟動排程器只需要呼叫start()方法,對於除BlockingScheduler以外的排程程式,此呼叫將立即返回,您可以繼續應用程式的初始化過程,可能會將作業新增到排程程式。

對於BlockingScheduler,只需在完成初始化步驟後呼叫start()

 

scheduler.start()

 

新增作業

方法一:呼叫add_job()方法

最常見的方法,add_job()方法返回一個apscheduler.job.Job例項,您可以稍後使用它來修改或刪除該作業。

方法二:使用裝飾器scheduled_job()

此方法主要是方便的宣告在應用程式執行時不會改變的作業

刪除作業

方法一:通過作業ID或別名呼叫remove_job()刪除作業

方法二:通過add_job()返回的job例項呼叫remove()方法刪除作業

舉個栗子:

# 例項刪除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
 # id刪除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

 

暫停和恢復作業

可以通過Job例項或排程程式本身輕鬆暫停和恢復作業。 當作業暫停時,下一個執行時間將被清除,直到作業恢復,不會再計算執行時間。 要暫停作業,請使用以下任一方法:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

 

 

恢復作業:

apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

 

獲取作業列表

要獲得計劃作業的機器可處理列表,可以使用get_jobs()方法。 它將返回一個Job例項列表。 如果您只對特定作業儲存中包含的作業感興趣,則將作業儲存別名作為第二個引數。

為了方便起見,可以使用print_jobs()方法,它將列印格式化的作業列表,觸發器和下次執行時間。

修改作業屬性

您可以通過呼叫apscheduler.job.Job.modify()或modify_job()來修改除id以外的任何作業屬性。

 

job.modify(max_instances=6, name='Alternate name')

 

關閉排程程式

預設情況下,排程程式關閉其作業儲存和執行程式,並等待所有當前正在執行的作業完成,wait=False引數可選,代表立即停止,不用等待。

scheduler.shutdown(wait=False) 
附:1、定時任務執行指令碼小例子:

 

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging

scheduler = BlockingScheduler()   # 後臺執行

 # 設定為每日凌晨00:30:30時執行一次排程程式
@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')
def rebate():
        print "schedule execute"
        sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


if __name__ == '__main__':
    try:
        scheduler.start()
        sys_logging.debug("statistic scheduler start success")
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
        sys_logging.debug("statistic scheduler start-up fail")

 

2、第一次使用此定時器時總會執行兩次,一直不知道為什麼,後來發現,python 的flask框架在debug模式下會多開一個執行緒監測專案變化,所以每次會跑兩遍,可以將debug選項改為False

APScheduler(Python化的Cron)使用總結

簡介

APScheduler全程為Advanced Python Scheduler,是一款輕量級的Python任務排程框架。它允許你像Cron那樣安排定期執行的任務,並且支援Python函式或任意可呼叫的物件。官方文件:https://apscheduler.readthedocs.io/en/latest/userguide.html#basic-concepts

APScheduler安裝

方法一:使用pip安裝

$ pip install apscheduler 
方法二:如果pip不起作用,可以從pypi上下載最新的原始碼包(https://pypi.python.org/pypi/APScheduler/)進行安裝:

$ python setup.py install 
APScheduler元件

triggers(觸發器): 觸發器中包含排程邏輯,每個作業都由自己的觸發器來決定下次執行時間。除了他們自己初始配置意外,觸發器完全是無狀態的。

job stores(作業儲存器):儲存被排程的作業,預設的作業儲存器只是簡單地把作業儲存在記憶體中,其他的作業儲存器則是將作業儲存在資料庫中。當作業被儲存到一個持久化的作業儲存器中的時候,該作業的資料會被序列化,並在載入時被反序列化。作業儲存器不能共享排程器。

executors(執行器):處理作業的執行,他們通常通過在作業中提交指定的可呼叫物件到一個執行緒或者進城池來進行。當作業完成時,執行器將會通知排程器。

schedulers(排程器):配置作業儲存器和執行器可以在排程器中完成,例如新增、修改和移除作業。根據不同的應用場景可以選用不同的排程器,可選的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7種。

排程器

BlockingScheduler : 當排程器是你應用中唯一要執行的東西時。 
BackgroundScheduler : 當你沒有執行任何其他框架並希望排程器在你應用的後臺執行時使用(充電樁即使用此種方式)。 
AsyncIOScheduler : 當你的程式使用了asyncio(一個非同步框架)的時候使用。 
GeventScheduler : 當你的程式使用了gevent(高效能的Python併發框架)的時候使用。 
TornadoScheduler : 當你的程式基於Tornado(一個web框架)的時候使用。 
TwistedScheduler : 當你的程式使用了Twisted(一個非同步框架)的時候使用 
QtScheduler : 如果你的應用是一個Qt應用的時候可以使用。 
作業儲存器

如果你的應用在每次啟動的時候都會重新建立作業,那麼使用預設的作業儲存器(MemoryJobStore)即可,但是如果你需要在排程器重啟或者應用程式奔潰的情況下任然保留作業,你應該根據你的應用環境來選擇具體的作業儲存器。例如:使用Mongo或者SQLAlchemy JobStore (用於支援大多數RDBMS)

執行器

對執行器的選擇取決於你使用上面哪些框架,大多數情況下,使用預設的ThreadPoolExecutor已經能夠滿足需求。如果你的應用涉及到CPU密集型操作,你可以考慮使用ProcessPoolExecutor來使用更多的CPU核心。你也可以同時使用兩者,將ProcessPoolExecutor作為第二執行器。

觸發器

當你排程作業的時候,你需要為這個作業選擇一個觸發器,用來描述這個作業何時被觸發,APScheduler有三種內建的觸發器型別:

date 一次性指定日期 
interval 在某個時間範圍內間隔多長時間執行一次 
cron 和Linux crontab格式相容,最為強大 
date

最基本的一種排程,作業只會執行一次。它的引數如下:

run_date (datetime|str) – 作業的執行日期或時間 
timezone (datetime.tzinfo|str) – 指定時區 
舉個栗子:

# 2016-12-12執行一次job_function
sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text'])
# 2016-12-12 12:00:00執行一次job_function
sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])
interval

 

間隔排程,引數如下:

weeks (int) – 間隔幾周 
days (int) – 間隔幾天 
hours (int) – 間隔幾小時 
minutes (int) – 間隔幾分鐘 
seconds (int) – 間隔多少秒 
start_date (datetime|str) – 開始日期 
end_date (datetime|str) – 結束日期 
timezone (datetime.tzinfo|str) – 時區 
舉個栗子:

 

# 每兩個小時調一下job_function
sched.add_job(job_function, 'interval', hours=2)
cron

 

引數如下:

year (int|str) – 年,4位數字 
month (int|str) – 月 (範圍1-12) 
day (int|str) – 日 (範圍1-31) 
week (int|str) – 周 (範圍1-53) 
day_of_week (int|str) – 周內第幾天或者星期幾 (範圍0-6 或者 mon,tue,wed,thu,fri,sat,sun) 
hour (int|str) – 時 (範圍0-23) 
minute (int|str) – 分 (範圍0-59) 
second (int|str) – 秒 (範圍0-59) 
start_date (datetime|str) – 最早開始日期(包含) 
end_date (datetime|str) – 最晚結束時間(包含) 
timezone (datetime.tzinfo|str) – 指定時區 
舉個栗子:

 

# job_function將會在6,7,8,11,12月的第3個週五的1,2,3點執行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 截止到2016-12-30 00:00:00,每週一到週五早上五點半執行job_function
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')

 

配置排程程式

在應用程式中使用預設作業儲存和預設執行程式執行BackgroundScheduler的例子:

 

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
    # Initialize the rest of the application here, or before the scheduler initialization

 

這將生成一個名為“default”的MemoryJobStore和名為“default”的ThreadPoolExecutor的BackgroundScheduler,預設最大執行緒數為10。

如果不滿足於當前配置,如希望使用兩個執行器有兩個作業儲存器,並且還想要調整新作業的預設值並設定不同的時區,可按如下配置:

 

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

 # 配置作業儲存器
jobstores = {      
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 配置執行器,並設定執行緒數
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,     # 預設情況下關閉新的作業
    'max_instances': 3     # 設定排程程式將同時執行的特定作業的最大例項數3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

 

 

啟動排程器

啟動排程器只需要呼叫start()方法,對於除BlockingScheduler以外的排程程式,此呼叫將立即返回,您可以繼續應用程式的初始化過程,可能會將作業新增到排程程式。

對於BlockingScheduler,只需在完成初始化步驟後呼叫start()

 

scheduler.start()

 

新增作業

方法一:呼叫add_job()方法

最常見的方法,add_job()方法返回一個apscheduler.job.Job例項,您可以稍後使用它來修改或刪除該作業。

方法二:使用裝飾器scheduled_job()

此方法主要是方便的宣告在應用程式執行時不會改變的作業

刪除作業

方法一:通過作業ID或別名呼叫remove_job()刪除作業

方法二:通過add_job()返回的job例項呼叫remove()方法刪除作業

舉個栗子:

# 例項刪除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
 # id刪除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

 

暫停和恢復作業

可以通過Job例項或排程程式本身輕鬆暫停和恢復作業。 當作業暫停時,下一個執行時間將被清除,直到作業恢復,不會再計算執行時間。 要暫停作業,請使用以下任一方法:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

 

 

恢復作業:

apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

 

獲取作業列表

要獲得計劃作業的機器可處理列表,可以使用get_jobs()方法。 它將返回一個Job例項列表。 如果您只對特定作業儲存中包含的作業感興趣,則將作業儲存別名作為第二個引數。

為了方便起見,可以使用print_jobs()方法,它將列印格式化的作業列表,觸發器和下次執行時間。

修改作業屬性

您可以通過呼叫apscheduler.job.Job.modify()或modify_job()來修改除id以外的任何作業屬性。

 

job.modify(max_instances=6, name='Alternate name')

 

關閉排程程式

預設情況下,排程程式關閉其作業儲存和執行程式,並等待所有當前正在執行的作業完成,wait=False引數可選,代表立即停止,不用等待。

scheduler.shutdown(wait=False) 
附:1、定時任務執行指令碼小例子:

 

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging

scheduler = BlockingScheduler()   # 後臺執行

 # 設定為每日凌晨00:30:30時執行一次排程程式
@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')
def rebate():
        print "schedule execute"
        sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


if __name__ == '__main__':
    try:
        scheduler.start()
        sys_logging.debug("statistic scheduler start success")
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
        sys_logging.debug("statistic scheduler start-up fail")

 

2、第一次使用此定時器時總會執行兩次,一直不知道為什麼,後來發現,python 的flask框架在debug模式下會多開一個執行緒監測專案變化,所以每次會跑兩遍,可以將debug選項改為False