有關python定時任務服務[win]
阿新 • • 發佈:2018-12-16
關於python實現定時任務排程服務,直接上原始碼:
# -*- coding:utf-8 -*- import servicemanager import sys import time import win32event import win32service import datetime import win32serviceutil import winerror __authro__ = 'Bill' import os from apscheduler.schedulers.blocking import BlockingScheduler def get_file_dir(): return os.path.dirname(os.path.abspath(__file__)) + "\\" exe_path = get_file_dir() class Worker(object): @classmethod def DoPhpBatWorker(self): batDir = exe_path + "bat\\" batFile = 'loop.bat' os.system(batDir + batFile) @classmethod def DoVMWorker(self): batDir = exe_path + "bat\\" batFile = 'startVmware.bat' os.system(batDir + batFile) class TickScheduler(object): def __init__(self): self.__blockingScheduler = BlockingScheduler() def __addTask(self): onceTime = (datetime.datetime.now() + datetime.timedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S') self.__blockingScheduler.add_job(Worker.DoPhpBatWorker, 'interval', seconds=15) # 15秒一次 self.__blockingScheduler.add_job(Worker.DoVMWorker, 'date', run_date=onceTime) # 只執行一次 def run(self): self.__addTask() try: self.__blockingScheduler.start() except (KeyboardInterrupt, SystemExit): self.__blockingScheduler.shutdown() def stop(self): self.__blockingScheduler.shutdown() class AITickerTaskService(win32serviceutil.ServiceFramework): _svc_name_ = "AITickerTaskService" _svc_display_name_ = "AITickerTaskService" _svc_description_ = "Ticker服務元件" def __init__(self, args): win32serviceutil.ServiceFramework.__init__(self, args) self.hWaitStop = win32event.CreateEvent(None, 0, 0, None) self.__iniTask() def __iniTask(self): self.__tickScheduler = TickScheduler() def SvcDoRun(self): self.run = True self.__tickScheduler.run() while self.run: time.sleep(4) def SvcStop(self): self.run = False self.__tickScheduler.stop() time.sleep(4) self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING) win32event.SetEvent(self.hWaitStop) if __name__ == '__main__': if len(sys.argv) == 1: try: evtsrc_dll = os.path.abspath(servicemanager.__file__) servicemanager.PrepareToHostSingle(AITickerTaskService) servicemanager.Initialize('AITickerTaskService', evtsrc_dll) servicemanager.StartServiceCtrlDispatcher() except win32service.error as details: if details[0] == winerror.ERROR_FAILED_SERVICE_CONTROLLER_CONNECT: win32serviceutil.usage() else: import pickle modName = pickle.whichmodule(AITickerTaskService, AITickerTaskService.__name__) win32serviceutil.HandleCommandLine(AITickerTaskService)