1. 程式人生 > >Spring 定時器schedule實現

Spring 定時器schedule實現

執行 掃描 clearing process obj after ice wait code

註解方式:
核心類摘要:
1.ScheduledAnnotationBeanPostProcessor
2.ScheduledTaskRegistrar
3.TaskScheduler
4.ReschedulingRunnable
具體說明:
1.ScheduledAnnotationBeanPostProcessor

(1)核心方法:Object postProcessAfterInitialization(final Object bean, String beanName)
功能:負責@Schedule註解的掃描,構建ScheduleTask

(2)核心方法:onApplicationEvent(ContextRefreshedEvent event)

功能:spring容器加載完畢之後調用,ScheduleTask向ScheduledTaskRegistrar中註冊, 調用ScheduledTaskRegistrar.afterPropertiesSet()

2.ScheduledTaskRegistrar

(1)核心方法:void afterPropertiesSet()
功能:初始化所有定時器,啟動定時器

3.TaskScheduler
主要的實現類有三個 ThreadPoolTaskScheduler, ConcurrentTaskScheduler,TimerManagerTaskScheduler
作用:這些類的作用主要是將task和executor用ReschedulingRunnable包裝起來進行生命周期管理。

(1)核心方法:ScheduledFuture schedule(Runnable task, Trigger trigger)

4.ReschedulingRunnable
(1)核心方法:public ScheduledFuture schedule()
(2)核心方法:public void run()

public ScheduledFuture schedule() {
        synchronized (this.triggerContextMonitor) {
            this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
            if (this.scheduledExecutionTime == null) {
                return null;
            }
            long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
            this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
            return this;
        }
    }

    @Override
    public void run() {
        Date actualExecutionTime = new Date();
        super.run();
        Date completionTime = new Date();
        synchronized (this.triggerContextMonitor) {
            this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
        }
        if (!this.currentFuture.isCancelled()) {
            schedule();
        }
    }

通過schedule方法及run方法互相調用,再利用ScheduledExecutorService接口的schedule(Runnable command,long delay,TimeUnit unit)單次執行效果,從而實現一定時間重復觸發的效果。

配置文件的方式基本相似只是通過ScheduledTasksBeanDefinitionParser類讀取節點組裝對應定時任務bean
Spring定時器的實現

ScheduledThreadPoolExecutor類

 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
        delayedExecute(t);//延遲執行
        return t;
    }

    //執行到delaydExecute方法
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);//把任務加入到執行隊列中
            if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                task.cancel(false);
            else
                ensurePrestart();//任務未取消則調用
        }
    }

任務未取消則調用ensurePrestart(),ensurePrestart方法中調用了addWorker()方法,addWorker()方法中創建執行任務的Woker並且調用woker的run方法,調用runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法中的任務通過調用了ThreadPoolExecutor類中的getTask方法獲得
getTask()方法中調用了ScheduledThreadPoolExecutor內部類DelayedWorkQueue重寫的take方法或poll方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //實現延遲執行,根據延遲時間等待直到執行時間返回RunnableScheduledFuture
            for (; ; ) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }

Spring 定時器schedule實現