1. 程式人生 > >APScheduler Guide 翻譯:用法,相關說明

APScheduler Guide 翻譯:用法,相關說明

配置排程器

APScheduler提供多種方式來配置排程器。可以使用字典進行配置,也可以通過宣告一些關鍵變數。也可以先例項化一個排程器,然後去配置它。

不同調度器的配置共同選項可以再BaseSceduler裡檢視。該類的子類額外的配置選項可以到子類相關的API說明裡找到。作業倉庫和處理器也是如此。

假設你想使用BackgroundScheduler排程器,並且想使用預設的作業倉庫和預設的處理器在你的應用程式裡,程式碼可如下:

from apscheduler.schedulers.background import BackgroundScheduler


scheduler = BackgroundScheduler()

# Initialize the rest of the application here, or before the scheduler initialization

此時你建立的排程器將採用記憶體儲存,並且使用執行緒池處理器且最大執行緒為10。現在,假設你需要額外的配置。如你想使用兩個作業倉庫,兩個處理器,並且想為你的排程器設定不同的時區和為作業倉庫設定不同的預設值。我們將設你需要的是這些:

  1. MongoDBJobStore 名為mongo
  2. SQLAlchemyJobStore名為default(使用SQLite)
  3. 執行緒池處理器名為default,執行緒數為20
  4. 程序池處理器名為processpool,程序數為5
  5. UTC作為排程器的時區
  6. 預設關閉新的作業
  7. default作業倉庫最大儲存個數為3個

有如下策略:

方法1:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法2:

from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

方法3:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

啟動排程器

呼叫start()方法即可。對於BlockingScheduler,必須初始化之前才能開啟,因為一旦開始,就不允許再進行設定。其他型別的排程器,開始之後,仍然可以進行相關設定。

新增作業

方法1.呼叫add_job()

方法2.使用裝飾器函式scheduled_job()

方法1最為常用, add_job()函式會返回一個Job例項,方便你去修改或者刪除作業。而如果作業在應用程式執行時不會更改,那是用方法2將會十分方便。

在排程器執行時,你可以隨時排程作業。但如果當新增作業時,排程器沒有執行。將會臨時排程一下作業,但此次排程時間不會記為作業的首次執行時間。只有當排程器執行時,才會被記作首次執行時間。

有必要提醒的是,如果你使用一個處理器或者作業倉庫序列執行作業,需要滿足下列需求:

  1. 目標作業必須全域性可以訪問
  2. 被呼叫的變數必須序列

關於內建的作業倉庫,只有記憶體倉庫不需要序列作業,關於內建的處理器,只有程序池處理器需要序列作業。

如果你打算在你應用程式初始化時,使用持久作業倉庫排程作業,你必須為每個作業設定一個精確的ID。使用replace_existing=True,否則你需要每次在你的應用程式重啟時,必須將原有的作業拷貝。如果想要新增作業後立即執行,則不必設定觸發器。

移除作業

從排程器移除作業之後,作業將會從作業倉庫移除,並不再被執行。有兩種方法:

  1. 呼叫remove_job(),需要傳入job_ID,和作業倉庫的別名
  2. 呼叫remove(),需要傳入作業例項

後一種可能更為方便,但是他需要你新增作業時在某處儲存作業例項。針對使用裝飾器scheduled_job(),只能使用第一種方法。

舉例-方法1

cheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

舉例-方法2

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

如果一個作業不再被排程(比如日期觸發器觸發之後,不再觸發)。作業將會自動被移除。

中止和恢復作業

通過作業例項或者排程器可以很容易地停止或者恢復作業,當作業中止之後,下次執行時間將會被清除,並且不會再有新的執行時間直到作業被恢復。

中止方法如下:

apscheduler.job.Job.pause()

apscheduler.schedulers.base.BaseScheduler.pause_job()

恢復方法如下:

apscheduler.job.Job.resume()

apscheduler.schedulers.base.BaseScheduler.resume_job()

獲取排程作業列表

呼叫get_jobs()將會返回作業例項列表,如果你想獲取某個作業倉庫下的作業,講引數作業倉庫別名傳入。

使用print_jobs()可以打印出下面資訊表:作業、對應的觸發器、下次執行時間

修改作業

呼叫apscheduler.job.Job.modify()或者modify_job(),除了ID無法修改,其餘均可更改。

例如

job.modify(max_instances=6, name='Alternate name')

如果你想更改觸發器,可以使用apscheduler.job.Job.reschedule()或者reschedule_job()。這個方法會為這個作業生成一個新的觸發器,並且會重新計算下次執行的時間。

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

關閉排程器

呼叫scheduler.shutdown()

該方法預設會先關閉作業倉庫和處理器,然後等待當前進行的作業結束後,再關閉。如果想直接關閉,傳入引數wait=Flase。

中止和恢復正在進行的作業

scheduler.pause()中止作業,進入休眠狀態

scheduler.resume()恢復作業,進入喚醒狀態

scheduler.start(paused=True)啟動作業,休眠狀態的將不會喚醒

限制執行作業的例項數

預設情況,在同一時間,一個作業只允許一個例項。這意味著如果上一觸發時間作業還沒有執行完,下一個觸發時間將不會被執行。(下面的英文說下一次的觸發稱作misfire,意思是失火,不好理解。舉個現實的例子,假如我有個任務是每5秒更新一下快取資料,通過REST ful介面獲取,如果又一次發生超時,那麼就沒法獲取最新的資料來快取。這是很嚴重的事故。允許開啟多個例項也就是這個意思。misfire應該是特有的術語,可以理解為要執行的任務沒被執行,就是misfire)指定max_instances就好。

未執行作業

有時會發生排程要執行的作業沒有被執行,這種情況常發生在使用持久的作業倉庫時,要執行的作業因為排程器關閉或者重啟,而使得作業未被執行。這種作業就被成為misfired。排程器會通過作業的misfire_grace_time來監測沒有執行的時間,來看這個作業能被再被觸發。這會使得該作業被成功的執行多次。如果你不想被執行多次,你可以將未被執行的次數合併成一次。設定開啟coalesing就可以了。

如果因為程序池或者執行緒池滿了,而導致作業執行被延後,那處理器會跳過執行延後的作業。你可以通過增加程序數或者執行緒數來解決這個問題。也可以通過設misfire_grace_time來解決這個問題。

(理解一下,指定misfire_grace_time,會把沒有執行的作業放到佇列裡,然後依次執行。如果不想任務在執行多次,開啟coalesing就可以。)

排程器事件

可以為排程器的事件設定監聽器,在某些情況下,可能會產生排程器事件,在某些特定的事件,可能會伴隨著相關的細節資訊。因此有必要堅挺排程器產生的事件。舉例如下

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

問題定位

可以程式碼示例通過如下來定位問題:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)