分散式全文檢索引擎ElasticSearch原理及使用例項
文章目錄
-
- 安裝
- 基本概念介紹
- 排程器的工作流程
-
-
- 例項1 -間隔性任務
- 例項2 - cron 任務
-
- 配置排程器
-
-
- 方法一
- 方法二
- 方法三:
-
- 啟動排程器
-
-
- 方法一:使用預設的作業儲存器:
- 方法二:使用資料庫作為儲存器:
-
- 排程器事件監聽
說到定時任務,你會想起 linux 自帶的 crontab ,windows 自帶的任務計劃,都可以實現守時任務。沒錯,作業系統基本都會提供定時任務的實現,但是如果你想要更加精細化的控制,或者說任務程式需要跨平臺執行,最好還是自己實現定時任務框架,Python 的 apscheduler 提供了非常豐富而且方便易用的定時任務介面。本文介紹如何使用 apscheduler 實現你的定時任務。
apscheduler 使用起來十分方便。提供了基於日期、固定時間間隔以及crontab 型別的任務,我們可以在主程式的執行過程中快速增加新作業或刪除舊作業,如果把作業儲存在資料庫中,那麼作業的狀態會被儲存,當排程器重啟時,不必重新新增作業,作業會恢復原狀態繼續執行。apscheduler 可以當作一個跨平臺的排程工具來使用,可以做為 linux 系統crontab 工具或 windows 計劃任務程式的替換。注意,apscheduler 不是一個守護程序或服務,它自身不帶有任何命令列工具。它主要是要在現有的應用程式中執行,也就是說,apscheduler 為我們提供了構建專用排程器或排程服務的基礎模組。
安裝
安裝非常簡單,會用 pip 的人都知道
pin install apscheduler
基本概念介紹
觸發器(triggers):觸發器包含排程邏輯,描述一個任務何時被觸發,按日期或按時間間隔或按 cronjob 表示式三種方式觸發。每個作業都有它自己的觸發器,除了初始配置之外,觸發器是完全無狀態的。
作業儲存器(job stores):作業儲存器指定了作業被存放的位置,預設情況下作業儲存在記憶體,也可將作業儲存在各種資料庫中,當作業被存放在資料庫中時,它會被序列化,當被重新載入時會反序列化。作業儲存器充當儲存、載入、更新和查詢作業的中間商。在排程器之間不能共享作業儲存。
執行器(executors):執行器是將指定的作業(呼叫函式)提交到執行緒池或程序池中執行,當任務完成時,執行器通知排程器觸發相應的事件。
排程器(schedulers):任務排程器,屬於控制角色,通過它配置作業儲存器、執行器和觸發器,新增、修改和刪除任務。排程器協調觸發器、作業儲存器、執行器的執行,通常只有一個排程程式執行在應用程式中,開發人員通常不需要直接處理作業儲存器、執行器或觸發器,配置作業儲存器和執行器是通過排程器來完成的。
排程器的工作流程
例項1 -間隔性任務
# -*- coding: utf-8 -*-
# Time: 2018/10/13 19:01:30
# File Name: ex_interval.py
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
說明:
第 1 行程式碼宣告檔案內容以 utf-8 編碼,告訴Python 直譯器以 utf-8 編碼解析原始碼檔案。
匯入 datetime 模組,用於列印當前時間。匯入 os 模組,用於判斷作業系統型別。
匯入排程器模組 BlockingScheduler,這是最簡單的排程器,呼叫 start 方阻塞當前程序,如果你的程式只用於排程,除了排程程序外沒有其他後臺程序,那麼請用 BlockingScheduler 非常有用,此時排程程序相當於守護程序。
定義一個函式 tick 代表我們要排程的作業程式。
例項化一個 BlockingScheduler 類,不帶引數表明使用預設的作業儲存器-記憶體,預設的執行器是執行緒池執行器,最大併發執行緒數預設為 10 個(另一個是程序池執行器)。
第 11 行新增一個作業 tick,觸發器為 interval,每隔 3 秒執行一次,另外的觸發器為 date,cron。date 按特定時間點觸發,cron 則按固定的時間間隔觸發。
加入捕捉使用者中斷執行和直譯器退出異常,pass 關鍵字,表示什麼也不做。
執行結果如下所示:
可以看出,每 3 秒打印出了當前時間。
例項2 - cron 任務
# -*- coding: utf-8 -*-
# Time: 2018/10/13 19:21:09
# File Name: ex_cron.py
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'cron', hour=19,minute=23)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
定時 cron 任務也非常簡單,直接給觸發器 trigger 傳入 ‘cron’ 即可。hour =19 ,minute =23 這裡表示每天的19:23 分執行任務。這裡可以填寫數字,也可以填寫字串
hour =19 , minute =23
hour ='19', minute ='23'
minute = '*/3' 表示每 5 分鐘執行一次
hour ='19-21', minute= '23' 表示 19:23、 20:23、 21:23 各執行一次任務
python 就是這麼靈活、易用、可讀。例 2 執行結果如下:
配置排程器
排程器的主迴圈其實就是反覆檢查是不是有到時需要執行的任務,分以下幾步進行:
- 詢問自己的每一個作業儲存器,有沒有到期需要執行的任務,如果有,計算這些作業中每個作業需要執行的時間點,如果時間點有多個,做 coalesce 檢查。
- 提交給執行器按時間點執行。
在配置排程器前,我們首先要選取適合我們應用環境場景的排程器,儲存器和執行器。下面是各排程器的適用場景:
- BlockingScheduler:適用於排程程式是程序中唯一執行的程序,呼叫start函式會阻塞當前執行緒,不能立即返回。
- BackgroundScheduler:適用於排程程式在應用程式的後臺執行,呼叫start後主執行緒不會阻塞。
- AsyncIOScheduler:適用於使用了asyncio模組的應用程式。
- GeventScheduler:適用於使用gevent模組的應用程式。
- TwistedScheduler:適用於構建Twisted的應用程式。
- QtScheduler:適用於構建Qt的應用程式。
上述排程器可以滿足我們絕大多數的應用環境,本文以兩種排程器為例說明如何進行排程器配置。
作業儲存器的選擇有兩種:一是記憶體,也是預設的配置;二是資料庫。具體選哪一種看我們的應用程式在崩潰時是否重啟整個應用程式,如果重啟整個應用程式,那麼作業會被重新新增到排程器中,此時簡單的選取記憶體作為作業儲存器即簡單又高效。但是,當排程器重啟或應用程式崩潰時您需要您的作業從中斷時恢復正常執行,那麼通常我們選擇將作業儲存在資料庫中,使用哪種資料庫通常取決於為在您的程式設計環境中使用了什麼資料庫。我們可以自由選擇,PostgreSQL 是推薦的選擇,因為它具有強大的資料完整性保護。
同樣的,執行器的選擇也取決於應用場景。通常預設的 ThreadPoolExecutor 已經足夠好。如果作業負載涉及CPU 密集型操作,那麼應該考慮使用 ProcessPoolExecutor,甚至可以同時使用這兩種執行器,將ProcessPoolExecutor 行器新增為二級執行器。
apscheduler 提供了許多不同的方法來配置排程器。可以使用字典,也可以使用關鍵字引數傳遞。首先例項化排程程式,新增作業,然後配置排程器,獲得最大的靈活性。
如果排程程式在應用程式的後臺執行,選擇 BackgroundScheduler,並使用預設的 jobstore 和預設的executor,則以下配置即可:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
假如我們想配置更多資訊:設定兩個執行器、兩個作業儲存器、調整新作業的預設值,並設定不同的時區。下述三個方法是完全等同的。
配置需求
- 配置名為“mongo”的MongoDBJobStore作業儲存器
- 配置名為“default”的SQLAlchemyJobStore(使用SQLite)
- 配置名為“default”的ThreadPoolExecutor,最大執行緒數為20
- 配置名為“processpool”的ProcessPoolExecutor,最大程序數為5
- UTC作為排程器的時區
- coalesce預設情況下關閉
- 作業的預設最大執行例項限制為3
方法一
1 from pytz import utc
2
3 from apscheduler.schedulers.background import BackgroundScheduler
4 from apscheduler.jobstores.mongodb import MongoDBJobStore
5 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
6 from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExec utor
7
8
9 jobstores = {
10 'mongo': MongoDBJobStore(),
11 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
12 }
13 executors = {
14 'default': ThreadPoolExecutor(20),
15 'processpool': ProcessPoolExecutor(5)
16 }
17 job_defaults = {
18 'coalesce': False,
19 'max_instances': 3
20 }
21 scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法二
1 from apscheduler.schedulers.background import BackgroundScheduler
2 scheduler = BackgroundScheduler({
3 'apscheduler.jobstores.mongo': {
4 'type': 'mongodb'
5 },
6 'apscheduler.jobstores.default': {
7 'type': 'sqlalchemy',
8 'url': 'sqlite:///jobs.sqlite'
9 },
10 'apscheduler.executors.default': {
11 'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
12 'max_workers': '20'
13 },
14 'apscheduler.executors.processpool': {
15 'type': 'processpool',
16 'max_workers': '5'
17 },
18 'apscheduler.job_defaults.coalesce': 'false',
19 'apscheduler.job_defaults.max_instances': '3',
20 'apscheduler.timezone': 'UTC',
21 })
方法三:
1 from pytz import utc
2 from apscheduler.schedulers.background import BackgroundScheduler
3 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
4 from apscheduler.executors.pool import ProcessPoolExecutor
5
6 jobstores = {
7 'mongo': {'type': 'mongodb'},
8 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
9 }
10 executors = {
11 'default': {'type': 'threadpool', 'max_workers': 20},
12 'processpool': ProcessPoolExecutor(max_workers=5)
13 }
14 job_defaults = {
15 'coalesce': False,
16 'max_instances': 3
17 }
18 scheduler = BackgroundScheduler()
19
20 # .. do something else here, maybe add jobs etc.
21
以上涵蓋了大多數情況的排程器配置,在實際執行時可以試試不同的配置會有怎樣不同的效果。
啟動排程器
啟動排程器前需要先新增作業,有兩種方法向排程器新增作業:一是通過介面add_job(),二是通過使用函式裝飾器,其中 add_job() 返回一個apscheduler.job.Job類的例項,用於後續修改或刪除作業。
我們可以隨時在排程器上排程作業。如果在新增作業時,排程器還沒有啟動,那麼任務將不會執行,並且第一次執行時間在排程器啟動時計算。
注意:如果使用的是序列化作業的執行器或作業儲存器,那麼要求被呼叫的作業(函式)必須是全域性可訪問的,被呼叫的作業的引數是可序列化的,作業儲存器中,只有 MemoryJobStore 不會序列化作業。執行器中,只有ProcessPoolExecutor 將序列化作業。
啟用排程器只需要呼叫排程器的 start() 方法,下面分別使用不同的作業儲存器來舉例說明:
方法一:使用預設的作業儲存器:
1 #coding:utf-8
2 from apscheduler.schedulers.blocking import BlockingScheduler
3 import datetime
4 from apscheduler.jobstores.memory import MemoryJobStore
5 from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
6
7 def my_job(id='my_job'):
8 print (id,'-->',datetime.datetime.now())
9 jobstores = {
10 'default': MemoryJobStore()
11
12 }
13 executors = {
14 'default': ThreadPoolExecutor(20),
15 'processpool': ProcessPoolExecutor(10)
16 }
17 job_defaults = {
18 'coalesce': False,
19 'max_instances': 3
20 }
21 scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
22 scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
23 scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\
24 end_date='2018-05-30')
25 scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
26 scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
27 try:
28 scheduler.start()
29 except SystemExit:
30 print('exit')
31 exit()
執行結果如下:
job_once_now --> 2018-04-05 07:48:00.967391
job_date_once --> 2018-04-05 07:48:05.005532
job_interval --> 2018-04-05 07:48:05.954023
job_cron --> 2018-04-05 07:48:10.004431
job_interval --> 2018-04-05 07:48:10.942542
job_interval --> 2018-04-05 07:48:15.952208
job_cron --> 2018-04-05 07:48:20.007123
job_interval --> 2018-04-05 07:48:20.952202
……
上述程式碼使用記憶體作為作業儲存器,操作比較簡單,重啟程式相當於第一次執行。
方法二:使用資料庫作為儲存器:
1 #coding:utf-8
2 from apscheduler.schedulers.blocking import BlockingScheduler
3 import datetime
4 from apscheduler.jobstores.memory import MemoryJobStore
5 from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
6 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
7 def my_job(id='my_job'):
8 print (id,'-->',datetime.datetime.now())
9 jobstores = {
10 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
11 }
12 executors = {
13 'default': ThreadPoolExecutor(20),
14 'processpool': ProcessPoolExecutor(10)
15 }
16 job_defaults = {
17 'coalesce': False,
18 'max_instances': 3
19 }
20 scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
21 scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
22 scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\
23 end_date='2018-05-30')
24 scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
25 scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
26 try:
27 scheduler.start()
28 except SystemExit:
29 print('exit')
30 exit()
說明,在第 6 行、第 10 行程式碼修改為資料庫作為作業儲存器
執行結果如下:
Run time of job "my_job (trigger: date[2018-04-05 07:48:05 CST], next run at: 2018-04-05 07:48:05 CST)" was missed by 0:18:28.898146
job_once_now --> 2018-04-05 08:06:34.010194
job_interval --> 2018-04-05 08:06:38.445843
job_cron --> 2018-04-05 08:06:40.154978
job_interval --> 2018-04-05 08:06:43.285941
job_interval --> 2018-04-05 08:06:48.334360
job_cron --> 2018-04-05 08:06:50.172968
job_interval --> 2018-04-05 08:06:53.281743
job_interval --> 2018-04-05 08:06:58.309952
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
提示我們有作業本應在 2018-04-05 07:48:05 執行的作業沒有執行,因為現在的時間為 2018-04-05 08:06:34,錯過了 0:18:28 的時間。
如果將上術程式碼第 21-25 行註釋掉,重新執行本程式,則四種類型的作業仍會執行,結果如下:
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-11', second='*/10'], next run at: 2018-04-05 08:14:40 CST)" was missed by 0:00:23.680603
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-11', second='*/10'], next run at: 2018-04-05 08:14:40 CST)" was missed by 0:00:13.681604
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-11', second='*/10'], next run at: 2018-04-05 08:14:40 CST)" was missed by 0:00:03.681604
……
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST)" was missed by 0:00:15.687917
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST)" was missed by 0:00:10.687917
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST)" was missed by 0:00:05.687917
job_interval --> 2018-04-05 08:14:33.821645
job_interval --> 2018-04-05 08:14:38.529167
job_cron --> 2018-04-05 08:14:40.150080
job_interval --> 2018-04-05 08:14:43.296188
job_interval --> 2018-04-05 08:14:48.327317
作業仍會執行,說明作業被新增到資料庫中,程式中斷後重新執行時會自動從資料庫讀取作業資訊,而不需要重新再新增到排程器中,如果不註釋 21-25 行新增作業的程式碼,則作業會重新新增到資料庫中,這樣就有了兩個同樣的作業,避免出現這種情況可以在 add_job 的引數中增加 replace_existing=True,如
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval',seconds=3,replace_existing=True)
- 1
如果我們想執行錯過執行的作業,使用 misfire_grace_time,如
scheduler.add_job(my_job,args = ['job_cron',] ,id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11',second='*/15',coalesce=True,misfire_grace_time=30,replace_existing=True,end_date='2018-05-30')
- 1
說明:misfire_grace_time,假如一個作業本來 08:00 有一次執行,但是由於某種原因沒有被排程上,現在 08:01 了,這個 08:00 的執行例項被提交時,會檢查它預訂執行的時間和當下時間的差值(這裡是1分鐘),大於我們設定的 30 秒限制,那麼這個執行例項不會被執行。最常見的情形是 scheduler 被 shutdown 後重啟,某個任務會積攢了好幾次沒執行如 5 次,下次這個作業被提交給執行器時,執行 5 次。設定 coalesce=True 後,只會執行一次。
其他操作如下:
1 scheduler.remove_job(job_id,jobstore=None)#刪除作業
2 scheduler.remove_all_jobs(jobstore=None)#刪除所有作業
3 scheduler.pause_job(job_id,jobstore=None)#暫停作業
4 scheduler.resume_job(job_id,jobstore=None)#恢復作業
5 scheduler.modify_job(job_id, jobstore=None, **changes)#修改單個作業屬性資訊
6 scheduler.reschedule_job(job_id, jobstore=None, trigger=None,**trigger_args)#修改單個作業的觸發器並更新下次執行時間
7 scheduler.print_jobs(jobstore=None, out=sys.stdout)#輸出作業資訊
排程器事件監聽
scheduler 的基本應用,在前面已經介紹過了,但仔細思考一下:如果程式有異常丟擲會影響整個排程任務嗎?請看下面的程式碼,執行一下看看會發生什麼情況:
1 # coding:utf-8
2 from apscheduler.schedulers.blocking import BlockingScheduler
3 import datetime
4
5 def aps_test(x):
6 print (1/0)
7 print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
8
9 scheduler = BlockingScheduler()
10 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
11
12 scheduler.start()
執行結果如下:
Job "aps_test (trigger: cron[second='*/5'], next run at: 2018-04-05 12:46:35 CST)" raised an exception
Traceback (most recent call last):
File "C:\Users\xx\AppData\Local\Programs\python\python36\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "C:/Users/xx/PycharmProjects/mysite/staff/test2.py", line 7, in aps_test
print (1/0)
ZeroDivisionError: division by zero
Job "aps_test (trigger: cron[second='*/5'], next run at: 2018-04-05 12:46:35 CST)" raised an exception
Traceback (most recent call last):
File "C:\Users\xx\AppData\Local\Programs\python\python36\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "C:/Users/xx/PycharmProjects/mysite/staff/test2.py", line 7, in aps_test
print (1/0)
ZeroDivisionError: division by zero
可能看出每 5 秒丟擲一次報錯資訊。任何程式碼都可能丟擲異常,關鍵是,發生導常事件,如何第一時間知道,這才是我們最關心的,apscheduler 已經為我們想到了這些,提供了事件監聽來解決這一問題。
將上述程式碼稍做調整,加入日誌記錄和事件監聽,如下所示。
1 # coding:utf-8
2 from apscheduler.schedulers.blocking import BlockingScheduler
3 from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
4 import datetime
5 import logging
6
7 logging.basicConfig(level=logging.INFO,
8 format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
9 datefmt='%Y-%m-%d %H:%M:%S',
10 filename='log1.txt',
11 filemode='a')
12
13
14 def aps_test(x):
15 print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
16
17
18 def date_test(x):
19 print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
20 print (1/0)
21
22
23 def my_listener(event):
24 if event.exception:
25 print ('任務出錯了!!!!!!')
26 else:
27 print ('任務照常執行...')
28
29 scheduler = BlockingScheduler()
30 scheduler.add_job(func=date_test, args=('一次性任務,會出錯',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
31 scheduler.add_job(func=aps_test, args=('迴圈任務',), trigger='interval', seconds=3, id='interval_task')
32 scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
33 scheduler._logger = logging
34
35 scheduler.start()
說明:
第 7-11 行配置日誌記錄資訊,日誌檔案在當前路徑,檔名為 “log1.txt”。
第 33 行啟用 scheduler 模組的日記記錄。
第 23-27 定義一個事件監聽,出現意外情況列印相關資訊報警。
執行結果如下所示。
2018-04-05 12:59:29 迴圈任務
任務照常執行...
2018-04-05 12:59:32 迴圈任務
任務照常執行...
2018-04-05 12:59:35 迴圈任務
任務照常執行...
2018-04-05 12:59:38 迴圈任務
任務照常執行...
2018-04-05 12:59:41 一次性任務,會出錯
任務出錯了!!!!!!
2018-04-05 12:59:41 迴圈任務
任務照常執行...
2018-04-05 12:59:44 迴圈任務
任務照常執行...
2018-04-05 12:59:47 迴圈任務
任務照常執行...
在生產環境中,可以把出錯資訊換成傳送一封郵件或者傳送一個簡訊,這樣定時任務出錯就可以立馬就知道。
(完)
您的點贊是我堅持的動力。