elastic-job之監聽器
每個作業都可以配置一個任務監聽器,確切的說是隻能配置一個本地監聽器和一個分散式監聽器。Elastic-job有三種作業型別,但是它們的通用配置都是一樣的,所以本文在介紹作業的監聽器配置時將僅以簡單作業的配置為例。
本地監聽器
本地監聽器只在節點執行自己分片的時候排程,每個分片任務排程的時候本地監聽器都會執行。本地監聽器由ElasticJobListener介面定義,其定義如下:
/** * 彈性化分散式作業監聽器介面. * * @author zhangliang */ public interface ElasticJobListener { /** * 作業執行前的執行的方法.* * @param shardingContexts 分片上下文 */ void beforeJobExecuted(final ShardingContexts shardingContexts); /** * 作業執行後的執行的方法. * * @param shardingContexts 分片上下文 */ void afterJobExecuted(final ShardingContexts shardingContexts); }
該介面的介面方法的註釋上已經說明了對應的介面方法的呼叫時機,詳情也可以參考com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute()方法。簡單示例如下:
public class MyElasticJobListener implements ElasticJobListener { private static final Logger LOGGER = Logger.getLogger(MyElasticJobListener.class); @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { LOGGER.info(String.format("開始排程任務[%s]", shardingContexts.getJobName())); } @Overridepublic void afterJobExecuted(ShardingContexts shardingContexts) { LOGGER.info(String.format("任務[%s]排程完成", shardingContexts.getJobName())); } }
本地監聽器的配置由<job:listener/>
節點配置,如下示例中就通過<job:listener/>
給簡單作業myElasticJob定義了一個本地監聽器。
<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/> <job:simple id="myElasticJob" job-ref="simpleJob" registry-center-ref="regCenter" cron="0/30 * * * * ?" sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F" failover="true" overwrite="true" > <job:listener class="com.elim.learn.elastic.job.listener.MyElasticJobListener" /> </job:simple>
分散式監聽器
本地監聽器在作業執行本地的分片任務時會執行,如上面的示例,我們的作業被分成了6片,則監聽器任務會執行6次。而分散式監聽器會在總的任務開始執行時執行一次,在總的任務結束執行時執行一次。分散式監聽器也是在普通監聽器的基礎上實現的,由AbstractDistributeOnceElasticJobListener抽象類封裝的,其實現了ElasticJobListener介面。要實現自己的監聽器只需要繼承AbstractDistributeOnceElasticJobListener抽象類,實現其中的抽象方法即可。AbstractDistributeOnceElasticJobListener抽象類的定義如下:
/** * 在分散式作業中只執行一次的監聽器. * * @author zhangliang */ public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener { private final long startedTimeoutMilliseconds; private final Object startedWait = new Object(); private final long completedTimeoutMilliseconds; private final Object completedWait = new Object(); @Setter private GuaranteeService guaranteeService; private TimeService timeService = new TimeService(); public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) { if (startedTimeoutMilliseconds <= 0L) { this.startedTimeoutMilliseconds = Long.MAX_VALUE; } else { this.startedTimeoutMilliseconds = startedTimeoutMilliseconds; } if (completedTimeoutMilliseconds <= 0L) { this.completedTimeoutMilliseconds = Long.MAX_VALUE; } else { this.completedTimeoutMilliseconds = completedTimeoutMilliseconds; } } @Override public final void beforeJobExecuted(final ShardingContexts shardingContexts) { guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet()); if (guaranteeService.isAllStarted()) { doBeforeJobExecutedAtLastStarted(shardingContexts); guaranteeService.clearAllStartedInfo(); return; } long before = timeService.getCurrentMillis(); try { synchronized (startedWait) { startedWait.wait(startedTimeoutMilliseconds); } } catch (final InterruptedException ex) { Thread.interrupted(); } if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) { guaranteeService.clearAllStartedInfo(); handleTimeout(startedTimeoutMilliseconds); } } @Override public final void afterJobExecuted(final ShardingContexts shardingContexts) { guaranteeService.registerComplete(shardingContexts.getShardingItemParameters().keySet()); if (guaranteeService.isAllCompleted()) { doAfterJobExecutedAtLastCompleted(shardingContexts); guaranteeService.clearAllCompletedInfo(); return; } long before = timeService.getCurrentMillis(); try { synchronized (completedWait) { completedWait.wait(completedTimeoutMilliseconds); } } catch (final InterruptedException ex) { Thread.interrupted(); } if (timeService.getCurrentMillis() - before >= completedTimeoutMilliseconds) { guaranteeService.clearAllCompletedInfo(); handleTimeout(completedTimeoutMilliseconds); } } private void handleTimeout(final long timeoutMilliseconds) { throw new JobSystemException("Job timeout. timeout mills is %s.", timeoutMilliseconds); } /** * 分散式環境中最後一個作業執行前的執行的方法. * * @param shardingContexts 分片上下文 */ public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts); /** * 分散式環境中最後一個作業執行後的執行的方法. * * @param shardingContexts 分片上下文 */ public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts); /** * 通知任務開始. */ public void notifyWaitingTaskStart() { synchronized (startedWait) { startedWait.notifyAll(); } } /** * 通知任務結束. */ public void notifyWaitingTaskComplete() { synchronized (completedWait) { completedWait.notifyAll(); } } }
以下是一個使用分散式監聽器的示例:
public class MyDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { private static final Logger logger = Logger.getLogger(MyDistributeOnceElasticJobListener.class); /** * @param startedTimeoutMilliseconds * @param completedTimeoutMilliseconds */ public MyDistributeOnceElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) { super(startedTimeoutMilliseconds, completedTimeoutMilliseconds); } @Override public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) { logger.info("分散式監聽器開始……"); } @Override public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) { logger.info("分散式監聽器結束……"); } }
分散式監聽器用到了鎖的等待和通知,startedTimeoutMilliseconds和completedTimeoutMilliseconds分別用來指定作業開始前和完成後的對應的鎖等待最大超時時間。分散式監聽器由<job:distributed-listener/>
,以下是一個使用分散式監聽器的示例:
<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/> <job:simple id="myElasticJob" job-ref="simpleJob" registry-center-ref="regCenter" cron="0/30 * * * * ?" sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F" failover="true" overwrite="true" > <job:distributed-listener class="com.elim.learn.elastic.job.listener.MyDistributeOnceElasticJobListener" started-timeout-milliseconds="100" completed-timeout-milliseconds="100"/> </job:simple>
(本文由Elim寫於2017年10月2日)