1. 程式人生 > >有關python定時任務服務[win]

有關python定時任務服務[win]

關於python實現定時任務排程服務,直接上原始碼:

# -*- coding:utf-8 -*-
import servicemanager
import sys
import time
import win32event
import win32service
import datetime
import win32serviceutil
import winerror

__authro__ = 'Bill'

import os
from apscheduler.schedulers.blocking import BlockingScheduler


def get_file_dir():
    return os.path.dirname(os.path.abspath(__file__)) + "\\"


exe_path = get_file_dir()


class Worker(object):

    @classmethod
    def DoPhpBatWorker(self):
        batDir = exe_path + "bat\\"
        batFile = 'loop.bat'
        os.system(batDir + batFile)

    @classmethod
    def DoVMWorker(self):
        batDir = exe_path + "bat\\"
        batFile = 'startVmware.bat'
        os.system(batDir + batFile)


class TickScheduler(object):

    def __init__(self):
        self.__blockingScheduler = BlockingScheduler()

    def __addTask(self):
        onceTime = (datetime.datetime.now() + datetime.timedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
        self.__blockingScheduler.add_job(Worker.DoPhpBatWorker, 'interval', seconds=15) # 15秒一次
        self.__blockingScheduler.add_job(Worker.DoVMWorker, 'date', run_date=onceTime)  # 只執行一次

    def run(self):
        self.__addTask()
        try:
            self.__blockingScheduler.start()
        except (KeyboardInterrupt, SystemExit):
            self.__blockingScheduler.shutdown()

    def stop(self):
        self.__blockingScheduler.shutdown()


class AITickerTaskService(win32serviceutil.ServiceFramework):
    _svc_name_ = "AITickerTaskService"
    _svc_display_name_ = "AITickerTaskService"
    _svc_description_ = "Ticker服務元件"

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
        self.__iniTask()

    def __iniTask(self):
        self.__tickScheduler = TickScheduler()

    def SvcDoRun(self):
        self.run = True
        self.__tickScheduler.run()
        while self.run:
            time.sleep(4)

    def SvcStop(self):
        self.run = False
        self.__tickScheduler.stop()
        time.sleep(4)
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)


if __name__ == '__main__':
    if len(sys.argv) == 1:
        try:
            evtsrc_dll = os.path.abspath(servicemanager.__file__)
            servicemanager.PrepareToHostSingle(AITickerTaskService)
            servicemanager.Initialize('AITickerTaskService', evtsrc_dll)
            servicemanager.StartServiceCtrlDispatcher()
        except win32service.error as details:
            if details[0] == winerror.ERROR_FAILED_SERVICE_CONTROLLER_CONNECT:
                win32serviceutil.usage()
    else:
        import pickle

        modName = pickle.whichmodule(AITickerTaskService, AITickerTaskService.__name__)
        win32serviceutil.HandleCommandLine(AITickerTaskService)