1. 程式人生 > 實用技巧 >Flask中使用apscheduler

Flask中使用apscheduler

from flask import Flask,request
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from pymongo import MongoClient
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
import  uuid
import  time
from Secdeer.mongoclient import Client as LocalMongoClient import logging logger = logging.getLogger(__name__) mdb = LocalMongoClient(username='admin123', password='admin123', host='172.30.2.21', port=27017,db="zero") client = MongoClient('172.30.2.21', 27017, username='admin123', password='admin123') jobstores
= { 'default': MongoDBJobStore(client=client, database='zero', collection='LoopTask'), } executors = { 'default': ThreadPoolExecutor(max_workers=10) } def job_function(): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('job1 --- {}'.format(t)) # 2.建立排程器 scheduler
= BackgroundScheduler(jobstores=jobstores, executors=executors) scheduler.add_job(job_function, 'interval', seconds=3) scheduler.start() app = Flask(__name__) # 3.定義定時任務 def print_test(): print("print_test") # 動態啟動定時任務 @app.route('/start') def start(): # 5.啟動排程器,不阻塞 scheduler.start() return '啟動成功' @app.route("/add_str",methods=['POST']) def add_str(): mdb.add("UserStrate", {"str_task_id_lsit": []}) return "新增成功" # 動態新增定時任務 @app.route("/add_job",methods=['POST']) def add_job(): print(request.form) seconds = request.form.get("seconds") str_id = request.form.get("str_id") str_item= mdb.get_one_by_id("UserStrate",str_id) str_task_id_lsit = str_item.get("str_task_id_lsit") str_uuid = str(uuid.uuid4()) str_task_id_lsit.append(str_uuid) mdb.update_one_by_id("UserStrate",str_id,{"str_task_id_lsit":str_task_id_lsit}) print(request.method) print("add_job") print(seconds) #scheduler.add_job(print_test, 'interval', seconds=3, args=[], id=str(uuid.uuid4())) scheduler.add_job(print_test, 'date', run_date=seconds, id=str_uuid) return "add_job成功" # 動態暫停定時任務 @app.route("/pause_job/<param>") def pause_job(param): scheduler.pause_job(param) return "動態暫停定時任務成功:{}".format(param) # 動態恢復定時任務 @app.route("/resume_job/<param>") def resume_job(param): scheduler.resume_job(param) return "動態恢復定時任務成功:{}".format(param) # 動態刪除定時任務 @app.route("/remove_job/<param>") def remove_job(param): scheduler.remove_job(param) return "動態刪除定時任務成功:{}".format(param) # 關閉所有定時任務 @app.route("/del_job",methods=['POST']) def del_job(): str_id = request.form.get("str_id") str_item = mdb.get_one_by_id("UserStrate", str_id) str_task_id_lsit = str_item.get("str_task_id_lsit") for str_task_id in str_task_id_lsit: print(str_task_id) try: scheduler.remove_job(str_task_id) # str_task_id_lsit.remove(str_task_id) except Exception as e : print(e) continue return "關閉所有定時任務成功" @app.route("/test",) def test(): print("test") return "test" if __name__ == '__main__': app.run()
View Code