分散式定時任務框架Elastic-Job的使用
一、前言
Elastic-Job是一個優秀的分散式作業排程框架。
Elastic-Job是一個分散式排程解決方案,由兩個相互獨立的子專案Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務。
Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及程序隔離等服務。
1. Elastic-Job-Lite
-
分散式排程協調
-
彈性擴容縮容
-
失效轉移
-
錯過執行作業重觸發
-
作業分片一致性,保證同一分片在分散式環境中僅一個執行例項
-
自診斷並修復分散式不穩定造成的問題
-
支援並行排程
-
支援作業生命週期操作
-
豐富的作業型別
-
Spring整合以及名稱空間提供
-
運維平臺
2. Elastic-Job-Cloud
-
應用自動分發
-
基於Fenzo的彈性資源分配
-
分散式排程協調
-
彈性擴容縮容
-
失效轉移
-
錯過執行作業重觸發
-
作業分片一致性,保證同一分片在分散式環境中僅一個執行例項
-
支援並行排程
-
支援作業生命週期操作
-
豐富的作業型別
-
Spring整合
-
運維平臺
-
基於Docker的程序隔離(TBD)
二、導讀
1、Elastic-Job的核心思想
2、Elastic-Job的基本使用
三、Elastic-Job的核心思想
對於分散式計算而言,分片是最基本的思想,Elastic-Job也是沿用了這個思想,每個job跑部分資料,所有job執行完成,便是全量資料,官網給出的SimpleJob例子如下:
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
用switch case迴圈來對應分片的業務邏輯,case分片的index,進入業務邏輯執行。當然這裡也有不適應的場景,類似於MapReduce需要shuffle的場景就不適合了,比方說,要根據某一個欄位全域性分組聚合求結果,這時候怎麼分片都可能會不合理,因為每個分片只能處理N分之一的資料,沒辦法shuffle再聚合,這一點,也要根據具體的業務來使用。
那麼ShardingContext可以拿到那些資訊呢?原始碼如下
public final class ShardingContext {
/**
* 作業名稱.
*/
private final String jobName;
/**
* 作業任務ID.
*/
private final String taskId;
/**
* 分片總數.
*/
private final int shardingTotalCount;
/**
* 作業自定義引數.
* 可以配置多個相同的作業, 但是用不同的引數作為不同的排程例項.
*/
private final String jobParameter;
/**
* 分配於本作業例項的分片項.
*/
private final int shardingItem;
/**
* 分配於本作業例項的分片引數.
*/
private final String shardingParameter;
public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
jobName = shardingContexts.getJobName();
taskId = shardingContexts.getTaskId();
shardingTotalCount = shardingContexts.getShardingTotalCount();
jobParameter = shardingContexts.getJobParameter();
this.shardingItem = shardingItem;
shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
}
}
以上程式碼,jobParameter和shardingItem是最有用的引數,shardingItem決定switch case迴圈的走向,shardingParameter可以用業務的查詢條件,也可以用字串拼接的方式組裝很複雜的引數用於特定的業務。
四、Elastic-Job的基本使用
1、Job配置項
public class ElasticJobConfig {
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job");
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("jobdemo", "0/5 * * * * ?", 3)
.shardingItemParameters("0=A,1=A,2=B").failover(true).misfire(true).build();
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,
MyElasticJob.class.getCanonicalName());
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true)
.build();
return simpleJobRootConfig;
}
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
}
幾點說明:
註冊中心配置項,設定zookeeper叢集地址,我這裡用的本地單節點,所以只有一個,當然可以配置任務名稱,名稱空間(namespace,本質上會在zk裡生成一個目錄),超時時間,最大重試次數等等
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()中,overwrite引數非常重要,設定這個引數為true,修改過job配置資訊才會覆蓋zookeeper裡的資料,要不然不會生效。
2、SimpleJob的實現
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()) {
case 0: {
System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "引數:"
+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
break;
}
case 1: {
System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "引數:"
+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
break;
}
case 2: {
System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "引數:"
+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
break;
}
default: {
System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "引數:"
+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
break;
}
}
}
}
上面設定每5秒鐘執行一次,執行ElasticJobConfig的main方法,執行結果如下:
從上面的結果,可以看出,執行每個分片的任務,其實是放到一個執行緒池去執行的,對應的分片資訊和引數資訊在shardingContext可以拿到,實現業務非常方便。
最後,如果啟動多個JVM,那麼這些任務就分散到各個節點裡,如果一個節點宕機,下次觸發任務時,將把該分片任務丟到健康機器執行,這裡做到了節點容錯。但是某個分片的任務在執行過程中失敗了,那麼這裡是不會重新觸發改分片任務的執行的。
&nb