1. 程式人生 > 程式設計 >Python定時任務APScheduler安裝及使用解析

Python定時任務APScheduler安裝及使用解析

1、簡介

APScheduler是一個 Python 定時任務框架,使用起來十分方便。提供了基於日期、固定時間間隔以及 crontab 型別的任務,並且可以持久化任務、並以 daemon 方式執行應用。

2、APScheduler四個元件

APScheduler 四個元件分別為:觸發器(trigger),作業儲存(job store),執行器(executor),排程器(scheduler)。

觸發器(trigger)

包含排程邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會執行。除了他們自己初始配置意外,觸發器完全是無狀態的

APScheduler 有三種內建的 trigger:

  • date: 特定的時間點觸發
  • interval: 固定時間間隔觸發
  • cron: 在特定時間週期性地觸發

作業儲存(job store)

儲存被排程的作業,預設的作業儲存是簡單地把作業儲存在記憶體中,其他的作業儲存是將作業儲存在資料庫中。一個作業的資料講在儲存在持久化作業儲存時被序列化,並在載入時被反序列化。排程器不能分享同一個作業儲存。
APScheduler 預設使用 MemoryJobStore,可以修改使用 DB 儲存方案

執行器(executor)

處理作業的執行,他們通常通過在作業中提交制定的可呼叫物件到一個執行緒或者進城池來進行。當作業完成時,執行器將會通知排程器。

最常用的 executor 有兩種:

  • ProcessPoolExecutor
  • ThreadPoolExecutor

排程器(scheduler)

通常在應用中只有一個排程器,應用的開發者通常不會直接處理作業儲存、排程器和觸發器,相反,排程器提供了處理這些的合適的介面。配置作業儲存和執行器可以在排程器中完成,例如新增、修改和移除作業

2、安裝

$ pip install apscheduler

接下來我們看下簡單的幾個示例:

===============interval: 固定時間間隔觸發===============
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

def job():
  print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 定義BlockingScheduler
sched = BlockingScheduler()
sched.add_job(job,'interval',seconds=5) 
sched.start()
===============cron: 特定時間週期性地觸發===============
import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):
  t = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
  print('{} --- {}'.format(text,t))

scheduler = BlockingScheduler()
# 在每天22點,每隔 1分鐘 執行一次 job 方法
scheduler.add_job(job,'cron',hour=17,minute='*/1',args=['job1'])
# 在每天22和23點的25分,執行一次 job 方法
scheduler.add_job(job,hour='22-23',minute='25',args=['job2'])

scheduler.start()

通過裝飾器scheduled_job()新增方法

import time
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

@scheduler.scheduled_job('interval',seconds=5)
def job1():
  t = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
  print('job1 --- {}'.format(t))

@scheduler.scheduled_job('cron',second='*/7')
def job2():
  t = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
  print('job2 --- {}'.format(t))

scheduler.start()

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。