Elastic-Job-Lite 原始碼閱讀 ---- 作業執行
作業執行的核心流程:
因為使用了 Quartz,任務執行是實現了 Quartz 的 Job 介面:
public final class LiteJob implements Job { @Setter private ElasticJob elasticJob; @Setter private JobFacade jobFacade; @Override public void execute(final JobExecutionContext context) throws JobExecutionException { // 工廠方法生成具體 Job 型別,如當前是 SimpleJob。 JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute(); } }
1.具體的 Job 來執行作業:
/** * 執行作業. */ public final void execute() { try { // 檢查作業執行環境,檢查本機與註冊中心的時間誤差秒數是否在允許範圍 jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobExceptionHandler.handleException(jobName, cause); } // 獲取當前作業伺服器分片上下文 ShardingContexts shardingContexts = jobFacade.getShardingContexts(); // 是否允許可以傳送作業事件. if (shardingContexts.isAllowSendJobEvent()) { // 釋出作業狀態追蹤事件. jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); } if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } try { // 作業執行前的執行的方法: 增加分片監聽 jobFacade.beforeJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } // 故障轉移 jobFacade.failoverIfNecessary(); try { // 作業執行後的執行方法 jobFacade.afterJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } }
1.1具體看下獲取當前作業伺服器分片上下文程式碼:
@Override public ShardingContexts getShardingContexts() { // 是否開啟故障轉移 boolean isFailover = configService.load(true).isFailover(); if (isFailover) { List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems(); if (!failoverShardingItems.isEmpty()) { return executionContextService.getJobShardingContext(failoverShardingItems); } } // 如果需要分片且當前節點為主節點, 則作業分片. // 如果當前無可用節點則不分片. shardingService.shardingIfNecessary(); // 獲取執行在本作業例項的分片集合 List<Integer> shardingItems = shardingService.getLocalShardingItems(); if (isFailover) { shardingItems.removeAll(failoverService.getLocalTakeOffItems()); } // 刪除禁用的分片項 shardingItems.removeAll(executionService.getDisabledItems(shardingItems)); // 獲取當前作業服務分片上下文 return executionContextService.getJobShardingContext(shardingItems); }
1.1.1進入 shardingService.shardingIfNecessary(); // 任務分片就在這個方法中完成
public void shardingIfNecessary() {
// 獲取當前可用作業 instance 即作業節點
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
if (!leaderService.isLeaderUntilBlock()) { // 判斷當前節點是否是主節點。如果主節點正在選舉中而導致取不到主節點, 則sleep至主節點選舉完成再返回
blockUntilShardingCompleted();
return;
}
// 等待其他分片執行完成
waitingOtherShardingItemCompleted();
LiteJobConfiguration liteJobConfig = configService.load(false);
int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
// 主節點分片時持有此節點作為標記,有次節點所有作業執行都阻塞直到分片結束
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
// 把 jobName/sharding/ 下原有的分片清理掉,重新建立分片資訊
resetShardingInfo(shardingTotalCount);
// 選定分片策略,預設是平均分片演算法
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
// 真正執行分片操作地方,分片完成後執行事務回撥
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
1.1.1.1 進入 jobNodeStorage.executeInTransaction:
/**
* 在事務中執行操作.
*
* @param callback 執行操作的回撥
*/
public void executeInTransaction(final TransactionExecutionCallback callback) {
try {
// 分片肯定是要麼全部成功,要麼分片全部失敗,所以放到事務中操作
CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
callback.execute(curatorTransactionFinal);
curatorTransactionFinal.commit();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
1.1.1.1.1 事務操作的核心 callback.execute(curatorTransactionFinal);
// 事務執行操作回撥介面
@RequiredArgsConstructor
class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {
private final Map<JobInstance, List<Integer>> shardingResults;
@Override
public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
for (int shardingItem : entry.getValue()) {
// 將分片分配給具體的作業節點。事務操作,全部成功後才能看到分片的結果
curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
}
}
// 分片完成後,去掉需要分片標記
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();
// 分片完成後,去掉分片執行中標記
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();
}
}
執行完成後,可以 zk 看到結果:
get /elastic-job-demo/JobLite/sharding/0/instance
[email protected]@18444
1.1.2進入 executionContextService.getJobShardingContext(shardingItems); 細看一下:
/**
* 獲取當前作業伺服器分片上下文.
*
* @param shardingItems 分片項
* @return 分片上下文
*/
public ShardingContexts getJobShardingContext(final List<Integer> shardingItems) {
// 從zk獲取作業配置
LiteJobConfiguration liteJobConfig = configService.load(false);
// 移除當前正在執行的作業分片
removeRunningIfMonitorExecution(liteJobConfig.isMonitorExecution(), shardingItems);
if (shardingItems.isEmpty()) { // 如果分片項為空,則建立一個分片上下文
// 如 [email protected]@分片項用逗號隔開的字串@[email protected]@[email protected]@[email protected]
return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(), Collections.<Integer, String>emptyMap());
}
Map<Integer, String> shardingItemParameterMap = new ShardingItemParameters(liteJobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters()).getMap();
return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(), getAssignedShardingItemParameterMap(shardingItems, shardingItemParameterMap));
}
1.2進入 execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
// 藉助 Guava 非同步化作業執行軌跡,這樣不會影響任務執行,實現解耦
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
}
try {
process(shardingContexts, executionSource);
} finally {
// TODO 考慮增加作業失敗的狀態,並且考慮如何處理作業失敗的整體迴路
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
1.2.1 進入 process(shardingContexts, executionSource);
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) { // 1個分片就單執行緒執行
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) { // 多個分片多執行緒執行
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
1.2.1.2 進入 process(shardingContexts, each, jobExecutionEvent);
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(startEvent);
}
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
JobExecutionEvent completeEvent;
try {
process(new ShardingContext(shardingContexts, item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(completeEvent);
}
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
completeEvent = startEvent.executionFailure(cause);
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtil.transform(cause));
jobExceptionHandler.handleException(jobName, cause);
}
}
1.2.1.2.1 進入 process(new ShardingContext(shardingContexts, item));// SimpleJobExecutor 類
@Override
protected void process(final ShardingContext shardingContext) {
simpleJob.execute(shardingContext);
}
// 這步再進去就回到我們的測試用例,因為測試用例實現了 SimpleJob 裡
public void execute(ShardingContext context) {
System.out.println("呼叫拉啦啦");
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
1.3故障轉移 jobFacade.failoverIfNecessary(); // FailoverService類
@Override
public void failoverIfNecessary() {
if (configService.load(true).isFailover()) {
failoverService.failoverIfNecessary();
}
}
/**
* 如果需要失效轉移, 則執行作業失效轉移.
*/
public void failoverIfNecessary() {
if (needFailover()) {
// 故障轉移的時候使用 /jobName/leader/failover/latch 來保證故障分片不會被重複轉移
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
// 故障轉移具體內部類
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
// 從 jobName/leader/failover/items 抓取需要轉移的作業
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO 不應使用triggerJob, 而是使用executor統一排程。
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
// 這裡程式故障轉移排程一次就行,因為下次分片的時候不會分片到宕機的機器
jobScheduleController.triggerJob();
}
}
}
相關推薦
Elastic-Job-Lite 原始碼閱讀 ---- 作業執行
作業執行的核心流程: 因為使用了 Quartz,任務執行是實現了 Quartz 的 Job 介面: public final class LiteJob implements Job { @Setter private ElasticJob elasticJob
Elastic-Job-Lite 源碼分析 —— 作業分片策略
哈希 AD hash alloc hub strings put iat 總數 摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-sharding-strategy/ 「芋道源碼」歡迎轉載,保留摘要,謝謝! 本文基於 Elast
【芋道原始碼】純原始碼分享公眾號,目前有「Dubbo」「SpringCloud」「Java 併發」「RocketMQ」「Sharding-JDBC」「MyCAT」「Elastic-Job-Lite」「Elastic-
芋道原始碼 純原始碼分享公眾號,目前有「Dubbo」「SpringCloud」「Java 併發」「RocketMQ」「Sharding-JDBC」「MyCAT」「Elastic-Job-Lite」「Elastic-...
Elastic-Job-Lite詳解之作業排程
JobScheduler是elastic-job作業排程的關鍵類,也是起始類,在包com.dangdang.ddframe.job.lite.api下。排程任務的執行需要包含兩大步驟:任務的配置和任務的註冊。JobScheduler的建構函式除了任務配置和註冊相關資訊之
Elastic-Job-Lite 源碼分析 —— 運維平臺
job 哈哈哈 配置服務 posit rop cat jet gis nmap 本文基於 Elastic-Job V2.1.5 版本分享 1. 概述 2. Maven模塊 elastic-job-common-restful 3. Maven模塊 el
SpringBoot基礎教程3-1-5 Elastic-Job-lite快速整合
1 概述 Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務,外部依賴僅Zookeeper。 具體參考,官網 2 核心理念 分散式排程 Elastic-Job-Lite並無作業排程中心節點,而是基於部署作業框架的程式在到達相應時間點時各自觸發排程
Job控制檯(elastic job lite console)
elastic job lite console: 設計理念 1.本控制檯和Elastic Job並無直接關係,是通過讀取Elastic Job的註冊中心資料展現作業狀態,或更新註冊中心資料修改全域性配置。 2.控制檯只能控制作業本身是否執行,但不能控制作業程序的啟停,因為控制檯和作業本身伺服器是完全分
一、elastic-job、elastic-job-lite-console使用案例
關於配置檔案每個屬性的作用、怎麼配置,參考噹噹網提供的文件就OK了 elastic-job原始碼 配置郵件有cron表示式,如果不瞭解的,有一個生成cron表示式神器 一、elastic-job是噹噹網開源的分散式框架,實際生產中使用elastic-job做一些定時任務
elastic-job之Dataflow型別作業實現
一、前序二、Dataflow是什麼?Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。三、 怎麼開啟?可通過DataflowJobConfiguration
Elastic-Job專案原始碼分析1--核心骨架JobScheduler
簡介 我覺得,還是原文寫的好,大家可以去看看: 說下我喜歡的原因吧 可以和現有的工程對接 支援任務分片,這個太重要了!! 彈性,水平擴充套件很好;如果有機器宕機,由其他機器執行 視覺化的、web管理平臺 程式碼架構不錯,雖然各種各樣的service
elastic job (一) elastic-job-lite----SimpleJob
首先我們要了解一下什麼是任務排程,排程就是將一個任務套上一個時間,讓該任務可以在時間規律上去迴圈執行。一般的技術quartz、spring task、java.util.Timer,這幾種如果在單一機器上跑其實問題不大,但是如果一旦應用於叢集環境做分散式部署
elastic-job之Simple型別作業實現
1.什麼是Simple型別作業?Simple型別作業意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。2. 建立Simple型別專案,通過API啟動方式呼
Elastic-Job何為分散式作業
原文地址:http://dangdangdotcom.github.io/elastic-job/post/1.x/distribution/ 何為分散式作業? 分片概念 任務的分散式執行,需要將一個任務拆分為n個獨立的任務項,然後由分散式的伺服器分別執行某一個或
Elastic-Job原始碼解析(三)之分片定時任務執行
通過本篇的閱讀你將學會了解Elastic-Job的定時時機,及如何通過分片方式做一個分散式的定時任務框架。瞭解常用的三種分片策略,及如何自定義分散式分片策略 目錄 Elastic-Job如何通過SpringJobScheduler啟動定時 Ela
spark 作業執行原理原始碼閱讀(三)
概述 作業(Job) 排程階段(stage) 任務(Task) DAGScheduler:面向排程階段的任務調節器,負責接收spark應用提交的作業,根據RDD的依賴關係(根據寬依賴劃分)劃分排程階段,並提交stage給TaskScheduler。 TaskScheduler:面
Elastic-Job作業執行狀態監聽
原文地址:http://dangdangdotcom.github.io/elastic-job/post/1.x/execution_monitor/ 作業執行狀態監控 通過監聽elastic-job的zookeeper註冊中心的幾個關鍵節點即可完成作業執行狀態監控
Elastic-Job原始碼解析(二)之定時核心實現quartz
Elastic-Job是一個分散式定時任務框架,其內部的定時主要是利用quartz來實現,而Elastic-Job核心是對quartz進行了封裝,並提供了分散式任務的功能。具體是怎麼實現呢? 怎麼實現分散式呢? 主要是通過Zookeeper通訊,獲取任務伺服器ip地址,並通
Elastic-Job原始碼解析(一)之與Spring完美整合
看過小編寫SpringFramework原始碼解析的同學應該對Spring支援自定義標籤還有點印象吧,沒有的話我們回顧下,然後看看Elastic-Job是如何巧妙的利用自定義標籤生成Job任務的吧。請注意這裡用了一個巧妙關鍵字。我們看它如何巧妙的吧。 Spring自定義
python執行緒同步原語--原始碼閱讀
前面兩篇文章,寫了python執行緒同步原語的基本應用。下面這篇文章主要是通過閱讀原始碼來了解這幾個類的內部原理和是怎麼協同一起工作來實現python多執行緒的。 相關文章連結:python同步原語--執行緒鎖  
netty原始碼閱讀之效能優化工具類之Recycle異執行緒獲取物件
在這篇《netty原始碼閱讀之效能優化工具類之Recycler獲取物件》文章裡面,我們還有一個scavenge()方法沒有解析,也就是在別的執行緒裡面回收物件。下面我們開始介紹,從這個方法開始進入: boolean scavenge() { // con