1. 程式人生 > 程式設計 >Python任務排程利器之APScheduler詳解

Python任務排程利器之APScheduler詳解

Python任務排程利器之APScheduler詳解

任務排程應用場景

所謂的任務排程是指安排任務的執行計劃,即何時執行,怎麼執行等。在現實專案中經常出現它們的身影;特別是資料類專案,比如實時統計每5分鐘網站的訪問量,就需要每5分鐘定時從日誌資料分析訪問量。

總結下任務排程應用場景:

  • 離線作業排程:按時間粒度執行某項任務
  • 共享快取更新:定時重新整理快取,如redis快取;不同程序間的共享資料

任務排程工具

  • linux的crontab, 支援按照分鐘/小時/天/月/周粒度,執行任務
  • java的Quartz
  • windows的任務計劃

本文介紹的是python中的任務排程庫,APScheduler(advance python scheduler)。如果你瞭解Quartz的話,可以看出APScheduler是Quartz的python實現;APScheduler提供了基於時間,固定時間點和crontab方式的任務呼叫方案, 可以當作一個跨平臺的排程工具來使用。

APScheduler

元件介紹

APScheduler由5個部分組成:觸發器、排程器、任務儲存器、執行器和任務事件。

  • 任務job:任務id和任務執行func
  • 觸發器triggers:確定任務何時開始執行
  • 任務儲存器job stores: 儲存任務的狀態
  • 執行器executors:確定任務怎麼執行
  • 任務事件event:監控任務執行異常情況
  • 排程器schedulers:串聯任務的整個生命週期,新增編輯任務到任務儲存器,在任務的執行時間到來時,把任務交給執行器執行返回結果;同時發出事件監聽,監控任務事件 。

Python任務排程利器之APScheduler詳解

安裝

pip install apscheduler

簡單例子

from apscheduler.schedulers.background import BackgroundScheduler 
from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor 
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore 
from apscheduler.events import EVENT_JOB_EXECUTED,EVENT_JOB_ERROR 
import logging 
import datetime 
# 任務執行函式 
def job_func(job_id): 
 print('job %s is runed at %s' % (job_id,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) 
# 事件監聽 
def job_exception_listener(event): 
 if event.exception: 
 # todo:異常處理,告警等 
 print('The job crashed :(') 
 else: 
 print('The job worked :)') 
# 日誌 
logging.basicConfig() 
logging.getLogger('apscheduler').setLevel(logging.DEBUG) 
# 定義一個後臺任務非阻塞排程器 
scheduler = BackgroundScheduler() 
# 新增一個任務到記憶體中 
# 觸發器:trigger='interval' seconds=10 每10s觸發執行一次 
# 執行器:executor='default' 執行緒執行 
# 任務儲存器:jobstore='default' 預設記憶體儲存 
# 最大併發數:max_instances 
scheduler.add_job(job_func,trigger='interval',args=[1],id='1',name='a test job',max_instances=10,jobstore='default',executor='default',seconds=10) 
# 設定任務監聽 
scheduler.add_listener(job_exception_listener,EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) 
# 啟動排程器 
scheduler.start() 

執行情況:

job 1 is runed at 2020-03-21 20:00:38
The job worked :)
job 1 is runed at 2020-03-21 20:00:48
The job worked :)
job 1 is runed at 2020-03-21 20:00:58
The job worked :)

觸發器

觸發器決定何時執行任務,APScheduler支援的觸發器有3種

trigger='interval':按固定時間週期執行,支援weeks,days,hours,minutes, seconds, 還可指定時間範圍

sched.add_job(job_function,'interval',hours=2,start_date='2010-10-10 09:30:00',end_date='2014-06-15 11:00:00')

trigger='date': 固定時間,執行一次

sched.add_job(my_job,'date',run_date=datetime(2009,11,6,16,30,5),args=['text'])

trigger='cron': 支援crontab方式,執行任務

引數:分鐘/小時/天/月/周粒度,也可指定時間範圍

year (int|str) – 4-digit year 
 month (int|str) – month (1-12) 
 day (int|str) – day of the (1-31) 
 week (int|str) – ISO week (1-53) 
 day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) 
 hour (int|str) – hour (0-23) 
 minute (int|str) – minute (0-59) 
 second (int|str) – second (0-59) 
 start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) 
 end_date (datetime|str) – latest possible date/time to trigger on (inclusive) 

例子

# 星期一到星期五,5點30執行任務job_function,直到2014-05-30 00:00:00 
  sched.add_job(job_function,'cron',day_of_week='mon-fri',hour=5,minute=30,end_date='2014-05-30') 
  # 按照crontab格式執行, 格式為:分鐘 小時 天 月 周,*表示所有 
  # 5月到8月的1號到15號,0點0分執行任務job_function 
  sched.add_job(job_function,CronTrigger.from_crontab('0 0 1-15 may-aug *')) 

執行器

執行器決定如何執行任務;APScheduler支援4種不同執行器,常用的有pool(執行緒/程序)和gevent(io多路複用,支援高併發),預設為pool中執行緒池, 不同的執行器可以在排程器的配置中進行配置(見排程器)

  • apscheduler.executors.asyncio:同步io,阻塞
  • apscheduler.executors.gevent:io多路複用,非阻塞
  • apscheduler.executors.pool: 執行緒ThreadPoolExecutor和程序ProcessPoolExecutor
  • apscheduler.executors.twisted:基於事件驅動

任務儲存器

任務儲存器決定任務的儲存方式, 預設儲存在記憶體中(MemoryJobStore),重啟後就沒有了。APScheduler支援的任務儲存器有:

  • apscheduler.jobstores.memory:記憶體
  • apscheduler.jobstores.mongodb:儲存在mongodb
  • apscheduler.jobstores.redis:儲存在redis
  • apscheduler.jobstores.rethinkdb:儲存在rethinkdb
  • apscheduler.jobstores.sqlalchemy:支援sqlalchemy的資料庫如mysql,sqlite等
  • apscheduler.jobstores.zookeeper:zookeeper

不同的任務儲存器可以在排程器的配置中進行配置(見排程器)

排程器

APScheduler支援的排程器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler

  • BlockingScheduler:適用於排程程式是程序中唯一執行的程序,呼叫start函式會阻塞當前執行緒,不能立即返回。
  • BackgroundScheduler:適用於排程程式在應用程式的後臺執行,呼叫start後主執行緒不會阻塞。
  • AsyncIOScheduler:適用於使用了asyncio模組的應用程式。
  • GeventScheduler:適用於使用gevent模組的應用程式。
  • TwistedScheduler:適用於構建Twisted的應用程式。
  • QtScheduler:適用於構建Qt的應用程式。

從前面的例子,我們可以看到,排程器可以操作任務(併為任務指定觸發器、任務儲存器和執行器)和監控任務。

scheduler.add_job(job_func,seconds=10)

我們來詳細看下各個部分

排程器配置:在add_job我們看到jobstore和executor都是default,APScheduler在定義排程器時可以指定不同的任務儲存和執行器,以及初始的引數

from pytz import utc 
 from apscheduler.schedulers.background import BackgroundScheduler 
 from apscheduler.jobstores.mongodb import MongoDBJobStore 
 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore 
 from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor 
 # 通過dict方式執行不同的jobstores、executors和預設的引數 
 jobstores = { 
 'mongo': MongoDBJobStore(),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') 
 } 
 executors = { 
 'default': ThreadPoolExecutor(20),'processpool': ProcessPoolExecutor(5) 
 } 
 job_defaults = { 
 'coalesce': False,'max_instances': 3 
 } 
 # 定義排程器 
 scheduler = BackgroundScheduler(jobstoresjobstores=jobstores,executorsexecutors=executors,job_defaultsjob_defaults=job_defaults,timezone=utc) 
 def job_func(job_id): 
 print('job %s is runed at %s' % (job_id,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) 
 # 新增任務 
 scheduler.add_job(job_func,executor='processpool',seconds=10) 
 # 啟動排程器 
 scheduler.start() 

操作任務:排程器可以增加,刪除,暫停,恢復和修改任務。需要注意的是這裡的操作只是對未執行的任務起作用,已經執行和正在執行的任務不受這些操作的影響。

add_job

scheduler.add_job(job_func,seconds=10)

remove_job: 通過任務唯一的id,刪除的時候對應的任務儲存器裡記錄也會刪除

scheduler.add_job(myfunc,minutes=2,id='my_job_id') 
 scheduler.remove_job('my_job_id') 

Pausing and resuming jobs:暫停和重啟任務

scheduler.add_job(myfunc,id='my_job_id') 
 scheduler.pause_job('my_job_id') 
 scheduler.resume_job('my_job_id') 

Modifying jobs:修改任務的配置

job = scheduler.add_job(myfunc,id='my_job_id',max_instances=10) 
 # 修改任務的屬性 
 job.modify(max_instances=6,name='Alternate name') 
 # 修改任務的觸發器 
 scheduler.reschedule_job('my_job_id',trigger='cron',minute='*/5') 

監控任務事件型別,比較常用的型別有:

  • EVENT_JOB_ERROR: 表示任務在執行過程的出現異常觸發
  • EVENT_JOB_EXECUTED:任務執行成功時
  • EVENT_JOB_MAX_INSTANCES:排程器上執行的任務超過配置的引數時

scheduler.add_listener(job_exception_listener,EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

總結

到此這篇關於Python任務排程利器之APScheduler詳解的文章就介紹到這了,更多相關python任務排程 APScheduler內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!