Elastic-Job之簡單Job
簡介
elastic-job是噹噹網開源的基於zookeeper和quartz實現的分散式作業排程框架。github地址是https://github.com/dangdangdotcom/elastic-job,官方網站是http://elasticjob.io/。elastic-job分elastic-job-lite和elastic-job-cloud,elastic-job-lite定位為輕量級的無中心化解決方案,本文要介紹的用法也是基於elastic-job-lite的。官方的文件其實挺齊全的,本文旨在對elastic-job的應用做一個簡單的介紹,也算是完善自己的知識庫,詳細的資訊請參考官方網站
核心概念
- job:即要執行的任務
- 分片:即把任務拆分成多個片段,分別排程。這些片段可以落在不同的節點上,所以在實現任務排程的介面時需要根據當前的片段來進行操作,否則就失去了意義。片段數是從0開始的,比如總的分片數是6,一共有兩臺機器,則第一臺機器上分配的片段數將是0,1,2,而第二臺機器上分配的片段數將是3,4,5。
- 重新分片:當有新的機器加入或者有機器宕機的時候都將觸發重新分片。因為分片本來就是把總的片段數平均分配給不同的節點,節點數變了,每臺機器能夠分配的片段必將發生變化。
簡單任務
簡單任務對應於com.dangdang.ddframe.job.api.simple.SimpleJob介面,該介面的定義如下:
public interface SimpleJob extends ElasticJob {
/**
* 執行作業.
*
* @param shardingContext 分片上下文
*/
void execute(ShardingContext shardingContext);
}
該介面只定義了一個方法,用於執行需要的任務,你可以把你的定時作業需要執行的邏輯在此方法中實現。elastic-job定時排程時就會排程該execute方法。該方法只接收一個ShardingContext型別的引數。該引數中包含了任務排程一些比較核心的資訊,比如分片總數、當前的分片等。任務的實現需要根據當前的片段數來進行,否則可能達不到你的預期效果。以下是一個簡單的示例。
/**
* 普通作業,與Quartz的定時作業類似,只是會多了分片等功能
* @author Elim
* 2016年10月29日
*/
public class MyElasticJob implements SimpleJob {
private static final Logger LOGGER = Logger.getLogger(MyElasticJob.class);
@Override
public void execute(ShardingContext context) {
//當你的作業是分片的時候,你需要在你的Job的execute方法中根據當前的分片shardingItem的不同取值實現不同的邏輯,
//要把所有的shardingItem都覆蓋到,因為在分散式環境,每臺機器都不能確保它當前的分片是哪一個,並且我們需要保持程式
//的一致性,程式編寫好了對部署是不會有影響的。
int shardingItem = context.getShardingItem();
switch (shardingItem) {
case 0:
LOGGER.info("處理第一個分片");
break;
case 1:
LOGGER.info("處理第二個分片");
break;
case 2:
LOGGER.info("處理第三個分片");
break;
case 3:
LOGGER.info("處理第四個分片");
break;
case 4:
LOGGER.info("處理第五個分片");
break;
case 5:
LOGGER.info("處理第六個分片");
break;
}
LOGGER.info(context);
}
}
配置任務
定義好了作業任務的實現類後為了使作業任務生效,我們需要對其進行配置。配置有兩種方式,基於API的配置和基於Spring名稱空間的配置。以下介紹的都是基於Spring名稱空間的配置。
引入名稱空間
首先需要引入reg和job名稱空間,示例如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
</beans>
配置註冊中心
reg用於配置作用註冊中心,即配置zookeeper。
<reg:zookeeper id="regCenter" server-lists="localhost:2181"
namespace="dd-job" base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="3000" max-retries="3" />
- id用於給該註冊中心命名。
- server-lists用於指定使用的zookeeper的地址,多個地址之間用英文的逗號分隔。
- namespace用於指定註冊中心在zookeeper中的名稱空間,屬於zookeeper的概念。
- base-sleep-time-milliseconds用於指定等待重試的間隔時間的初始值,單位是毫秒。
- max-sleep-time-milliseconds用於指定等待重試的間隔時間的最大值,單位是毫秒。
- max-retries用於指定最大的重試次數。
配置作業
作業通過job名稱空間配置,簡單任務通過<job:simple/>
指定。
<job:simple id="myElasticJob" class="com.elim.learn.elastic.job.MyElasticJob"
registry-center-ref="regCenter" cron="0/30 * * * * ?"
sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
failover="true" overwrite="true" />
- id用於給該任務命名。
- class用於指定需要應用的SimpleJob實現類。
- registry-center-ref用於指定需要使用的註冊中心。
- cron用於指定定時排程的規則,應用cron表示式的語法。
- sharding-total-count用於指定總的分片數。
- sharding-item-parameters用於指定每片對應的引數,該引數可以通過ShardingContext的getShardingParameter()獲取。
- failover用於指定是否需要開啟失效轉移。只有在monitorExecution為true的情況下才有效,可以通過
<job:simple monitor-execution="true"/>
來指定,不過該屬性值預設也是true。 - overwrite用於指定該配置是否需要用來覆蓋註冊中心的配置。修改了配置後一定要記得指定該屬性值為true,否則還是使用的註冊中心的舊的配置。
使用上面的配置後,在Spring容器啟動後,我們的作業就會每30秒排程一次了。如果只有一臺機器,那麼上面的6片都會落到同一臺機器上,一共會發起6次排程。如果有兩臺機器就是每臺會得到三個分片,以此類推。當機器數量超出了分片數後,有的機器就會得不到分片,就沒有排程的機會,除非有機器宕機了,觸發了重新分片。
需要注意的是節點必須是在不同的機器上執行才行,一臺機器上啟動多個JVM是不會被認為是多個節點的,因為elastic-job是以客戶端的IP地址來識別一個節點的。
完整配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<description>
官方文件:http://elasticjob.io/index.html
</description>
<!-- 如果需要做分散式作業排程,則對應的例項必須是在多臺機器上跑的,因為elastic-job是以IP來區分一個節點的;另外namespace和使用的
zookeeper也必須是一樣的 -->
<!--配置作業註冊中心,server-lists用於指定zookeeper的地址,多個zookeeper之間用逗號分隔;
namespace用於指定zookeeper名稱空間;
max-retries用於指定最大重試次數 -->
<reg:zookeeper id="regCenter" server-lists="localhost:2181"
namespace="dd-job" base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="3000" max-retries="3" />
<!-- 可通過在http://repo1.maven.org/maven2/com/dangdang/elastic-job-console/1.1.1/下載對應的war包監控elastic-job的執行狀態 -->
<!-- 配置作業 -->
<!-- 引數overwrite為true即允許客戶端的作業配置覆蓋註冊中心的配置,每次啟動服務都會將客戶端的覆蓋註冊中心的, 預設為false。引數failover表示是否開啟失效轉移,預設為false,其它引數配置請參考官方文件 -->
<!-- sharding-total-count引數用於指定分片數,當分片數大於機器數量的時候,每臺機器分配到的片數會是平均的, 第一片是從0開始的,比如總共分6片,有兩臺機器,則第一臺機器會分得0,1,2三片,而第二臺機器會分得3,4,5三片;當有
機器宕機了或者有新機器加入的時候都會觸發重新分片。如果有多臺機器,而分片總數是1的時候即相當於1主多從的配置。 sharding-item-parameters用於指定與分片對應的別名。
job-sharding-strategy-class:可以通過它來指定作業分片策略,可選策略可參考官方文件https://github.com/dangdangdotcom/elastic-job/blob/master/elastic-job-doc/content/post/user_guide/lite/other/lite_job_strategy.md。 -->
<!-- SimpleJob的執行可以參考原始碼com.dangdang.ddframe.job.executor.type.SimpleJobExecutor的處理邏輯 -->
<job:simple id="myElasticJob" class="com.elim.learn.elastic.job.MyElasticJob"
registry-center-ref="regCenter" cron="0/30 * * * * ?"
sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
failover="true" overwrite="true" />
</beans>
如果你的job已經定義為了Spring的一個bean,那麼在定義<job:simple/>
時也可以不指定class,而是指定job-ref屬性關聯job對應的bean,如:
<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/>
<job:simple id="myElasticJob" job-ref="simpleJob"
registry-center-ref="regCenter" cron="0/30 * * * * ?"
sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
failover="true" overwrite="true" />
這裡的bean需要定義在
<job:simple/>
的前面,否則會提示找不到對應的bean定義。
(本文由Elim寫於2017年10月1日)