Spring 定時器schedule實現
註解方式:
核心類摘要:
1.ScheduledAnnotationBeanPostProcessor
2.ScheduledTaskRegistrar
3.TaskScheduler
4.ReschedulingRunnable
具體說明:
1.ScheduledAnnotationBeanPostProcessor
(1)核心方法:Object postProcessAfterInitialization(final Object bean, String beanName)
功能:負責@Schedule註解的掃描,構建ScheduleTask
(2)核心方法:onApplicationEvent(ContextRefreshedEvent event)
2.ScheduledTaskRegistrar
(1)核心方法:void afterPropertiesSet()
功能:初始化所有定時器,啟動定時器
3.TaskScheduler
主要的實現類有三個 ThreadPoolTaskScheduler, ConcurrentTaskScheduler,TimerManagerTaskScheduler
作用:這些類的作用主要是將task和executor用ReschedulingRunnable包裝起來進行生命周期管理。
(1)核心方法:ScheduledFuture schedule(Runnable task, Trigger trigger)
4.ReschedulingRunnable
(1)核心方法:public ScheduledFuture schedule()
(2)核心方法:public void run()
public ScheduledFuture schedule() { synchronized (this.triggerContextMonitor) { this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } } @Override public void run() { Date actualExecutionTime = new Date(); super.run(); Date completionTime = new Date(); synchronized (this.triggerContextMonitor) { this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime); } if (!this.currentFuture.isCancelled()) { schedule(); } }
通過schedule方法及run方法互相調用,再利用ScheduledExecutorService接口的schedule(Runnable command,long delay,TimeUnit unit)單次執行效果,從而實現一定時間重復觸發的效果。
配置文件的方式基本相似只是通過ScheduledTasksBeanDefinitionParser類讀取節點組裝對應定時任務bean
Spring定時器的實現
ScheduledThreadPoolExecutor類
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
delayedExecute(t);//延遲執行
return t;
}
//執行到delaydExecute方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);//把任務加入到執行隊列中
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();//任務未取消則調用
}
}
任務未取消則調用ensurePrestart(),ensurePrestart方法中調用了addWorker()方法,addWorker()方法中創建執行任務的Woker並且調用woker的run方法,調用runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法中的任務通過調用了ThreadPoolExecutor類中的getTask方法獲得
getTask()方法中調用了ScheduledThreadPoolExecutor內部類DelayedWorkQueue重寫的take方法或poll方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//實現延遲執行,根據延遲時間等待直到執行時間返回RunnableScheduledFuture
for (; ; ) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
Spring 定時器schedule實現