如何讓你的Python程式,定時定點地去執行任務?
apscheduler 的使用
我們專案中總是避免不了要使用一些定時任務,比如說最近的專案,使用者點選報名考試以後需要在考試日期臨近的時候推送小程式訊息提醒到客戶微信上,翻了翻 fastapi 中的實現,雖然方法和包也不少,但是要不就是太重了(比如需要再開服務,還要依賴 redis,都不好用),雖然也可以使用 time 模組的 time.sleep()機上 fastapi 的後臺任務變相實現,但是相對簡單的功能還行,複雜點的程式碼起來就麻煩了,所以還是專人專事找個負責這個額的包吧。找來找去發現 APScheduler 就挺適合,程式碼簡單,實現效果也很好,這裡做個記錄!
安裝
pip install apscheduler
主要組成部分
概念性東西,混個臉熟,程式碼比這些定義好理解。
觸發器(trigger)包含排程邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會執行。除了他們自己初始配置意外,觸發器完全是無狀態的。說人話就是你指定那種方式觸發當前的任務。
乾貨主要有:
① 200 多本 Python 電子書(和經典的書籍)應該有
② Python標準庫資料(最全中文版)
③ 專案原始碼(四五十個有趣且可靠的練手專案及原始碼)
④ Python基礎入門、爬蟲、網路開發、大資料分析方面的視訊(適合小白學習)
⑤ Python學習路線圖(告別不入流的學習)
Python學習交流Q群101677771
型別 |
解釋 |
DateTrigger |
到期執行(到xxxx年x月x日 x時x分x秒執行) 對應DateTrigger |
IntervalTrigger |
間隔執行(每5秒執行一次) |
CronTrigger |
一個crontab型別的條件(這個比較複雜,比如週一到週四的4-5點每5秒執行一次) |
作業儲存(job store)儲存被排程的作業,預設的作業儲存是簡單地把作業儲存在記憶體中,其他的作業儲存是將作業儲存在資料庫中。一個作業的資料講在儲存在持久化作業儲存時被序列化,並在載入時被反序列化。排程器不能分享同一個作業儲存。
Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態新增Jobstore。每個jobstore
都會繫結一個alias,scheduler在Add Job時,根據指定的jobstore在scheduler中找到相應的jobstore,並
將job新增到jobstore中。
Jobstore主要是通過pickle庫的loads和dumps【實現核心是通過python的__getstate__和__setstate__重寫
實現】,每次變更時將Job動態儲存到儲存中,使用時再動態的加載出來,作為儲存的可以是redis,也可以
是資料庫【通過sqlarchemy這個庫整合多種資料庫】,也可以是mongodb等
目前APScheduler支援的Jobstore:
MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore
執行器(executor)處理作業的執行,他們通常通過在作業中提交制定的可呼叫物件到一個執行緒或者進城池來進行。當作業完成時,執行器將會通知排程器。
- 說人話就是新增任務時候用它來包裝的,executor的種類會根據不同的排程來選擇,如果選擇AsyncIO作為排程的庫,那麼選擇AsyncIOExecutor,如果選擇tornado作為排程的庫,選擇TornadoExecutor,如果選擇啟動程序作為排程,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以Executor的選擇需要根據實際的scheduler來選擇不同的執行器
目前APScheduler支援的Executor:
AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor
排程器(scheduler)是其他的組成部分。你通常在應用只有一個排程器,應用的開發者通常不會直接處理作業儲存、排程器和觸發器,相反,排程器提供了處理這些的合適的介面。配置作業儲存和執行器可以在排程器中完成,例如新增、修改和移除作業.
Scheduler是APScheduler的核心,所有相關元件通過其定義。scheduler啟動之後,將開始按照配置的任務進行排程。
除了依據所有定義Job的trigger生成的將要排程時間喚醒排程之外。當發生Job資訊變更時也會觸發排程。
scheduler可根據自身的需求選擇不同的元件,如果是使用AsyncIO則選擇AsyncIOScheduler,使用tornado則
選擇TornadoScheduler。
目前APScheduler支援的Scheduler:
AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler
簡單應用
import time
from apscheduler.schedulers.blocking import BlockingScheduler # 引入後臺
def my_job():
print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5)
sched.start()
完整程式碼
# trigeers 觸發器
# job stores job 儲存
# executors 執行器
# schedulers 排程器
from pytz import utc
from sqlalchemy import func
from apscheduler.schedulers.background import BackgroundScheduler,AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
# 可以配置多個儲存
#'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore指定儲存連結
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20}, # 最大工作執行緒數20
'processpool': ProcessPoolExecutor(max_workers=5) # 最大工作程序數為5
}
job_defaults = {
'coalesce': False, # 關閉新job的合併,當job延誤或者異常原因未執行時
'max_instances': 3 # 併發執行新job預設最大例項多少
}
scheduler = BackgroundScheduler()
# .. do something else here, maybe add jobs etc.
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作為排程程式的時區
import os
import time
def print_time(name):
print(f'{name} - {time.ctime()}')
def add_job(job_id, func, args, seconds):
"""新增job"""
print(f"新增間隔執行任務job - {job_id}")
scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)
def add_coun_job(job_id, func, args, start_time):
"""新增job"""
print(f"新增一次執行任務job - {job_id}")
scheduler.add_job(id=job_id, func=func, args=args, trigger='date',timezone='Asia/Shanghai', run_date=start_time)
# scheduler.add_job(func=print_time, trigger='date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])
def remove_job(job_id):
"""移除job"""
scheduler.remove_job(job_id)
print(f"移除job - {job_id}")
def pause_job(job_id):
"""停止job"""
scheduler.pause_job(job_id)
print(f"停止job - {job_id}")
def resume_job(job_id):
"""恢復job"""
scheduler.resume_job(job_id)
print(f"恢復job - {job_id}")
def get_jobs():
"""獲取所有job資訊,包括已停止的"""
res = scheduler.get_jobs()
print(f"所有job - {res}")
def print_jobs():
print(f"詳細job資訊")
scheduler.print_jobs()
def start():
"""啟動排程器"""
scheduler.start()
def shutdown():
"""關閉排程器"""
scheduler.shutdown()
if __name__ == '__main__':
scheduler = BackgroundScheduler()
# start()
# print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C'))
# add_job('job_A', func=print_time, args=("A", ), seconds=1)
# add_job('job_B', func=print_time, args=("B", ), seconds=2)
# time.sleep(6)
# pause_job('job_A') # 停止a
# get_jobs() #得到所有job
# time.sleep(6)
# print_jobs()
# resume_job('job_A')
# time.sleep(6)
# remove_job('job_A')
# time.sleep(6)
from datetime import datetime
import pytz
start()
date_temp = datetime(2022, 2, 19, 17, 30, 5)
# scheduler.add_job(print_time, 'date', run_date=datetime.now(pytz.timezone('America/Manaus')), args=['text'])
# scheduler.add_job(print_time, 'date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])
add_coun_job(job_id="job_C",func=print_time,args=('一次性執行任務',),start_time=datetime(2022, 2, 19, 18, 4, 0).astimezone())
time.sleep(130)
try:
shutdown()
except RuntimeError:
pass