分散式任務框架--Elastic-job
一、簡介 Elastic-job是一個分散式排程解決方案,由2個相互獨立的子專案Elastic-job-Lite和Elastic-Job-cloud組成; Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務;Elastic-Job-Cloud採用噹噹網自研Mesos Framework的解決方案,額外提供資源治理,應用分發以及程序隔離等功能; 官方開源資料:https://github.com/dangdangdotcom/elastic-job 二、基本概念 我們開發定時任務一般都是使用quartz或者spring-task(ScheduledExecutorService),無論是使用quartz還是spring-task,我們都會至少遇到兩個痛點: 1.不敢輕易跟著應用服務多節點部署,可能會重複多次執行而引發系統邏輯的錯誤。 2.quartz的叢集僅僅只是用來單機鎖表防止重複多次執行,節點數量的增加並不能給我們的每次執行效率帶來提升,即不能實現水平擴充套件。 Elastic-Job主要的設計理念是無中心化的分散式定時排程框架,思路來源於Quartz的基於資料庫的高可用方案。但資料庫沒有分散式協調功能,所以在高可用方案的基礎上增加了彈性擴容和資料分片的思路,以便於更大限度的利用分散式伺服器的資源。 三、Elastic-Job-Lite原理 Elastic-Job在2.x之後,出了兩個產品線:Elastic-Job-Lite和Elastic-Job-Cloud。我們一般使用Elastic-Job-Lite就能夠滿足需求,本文也是以Elastic-Job-Lite為主。1.x系列對應的就只有Elastic-Job-Lite,並且在2.x裡修改了一些核心類名,差別雖大,原理類似,建議使用2.x系列
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.0.5</version>
</dependency>
<!-- 使用springframework自定義名稱空間時引入 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.0.5</version>
</dependency>
2、Simple作業開發 意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardIndx = shardingContext.getShardingItem();
if (shardIndx == 0) {
//處理id為奇數的商家
System.out.println(String.format("------Thread ID: %s, Total number of task slices: %s, current fragmentation items: %s",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
} else {
//處理id為偶數的商家
System.out.println(String.format("------Thread ID: %s, Total number of task slices: %s, current fragmentation items: %s",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
}
/**
* 實際開發中,有了任務總片數和當前分片項,就可以對任務進行分片執行了
* 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem
*/
}
}
3、作業配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd">
<!--配置作業註冊中心 -->
<reg:zookeeper id="regCenter" server-lists="172.16.150.247:2181" namespace="dd-job"
base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3"/>
<!-- 配置作業-->
<job:simple id="mySimpleJob" class="com.liuxl.elastic.MySimpleJob" registry-center-ref="regCenter"
sharding-total-count="2" cron="0/2 * * * * ?" overwrite="true"/>
<!--failover:是否開啟任務執行失效轉移,開啟表示如果作業在一次任務執行中途宕機,允許將該次未完成的任務在另一作業節點上補償執行
description:作業描述
overwrite:本地配置是否可覆蓋註冊中心配置,如果可覆蓋,每次啟動作業都以本地配置為準
event-trace-rdb-data-source:作業事件追蹤的資料來源Bean引用
—>
</beans>
4、單機執行 本機執行 5、叢集執行 本機執行、132機器 6、Dataflow作業開發 public class MyElasticJob implements DataflowJob<List> {
@Override
public List<List<Object>> fetchData(ShardingContext shardingContext) {
int shardIndx = shardingContext.getShardingItem();
File file = new File("/Users/liuxl/Desktop/worker/智慧宣教引擎/20181012.xlsx");
List<List<Object>> reslut = new ArrayList<>();
try {
List<ExcelSheetPO> pos = ExcelUtil.readExcel(file, 400, 3);
if (shardIndx == 0) {
List<List<Object>> list = pos.get(0).getDataList();
//不要病區id
for (List<Object> objects : list) {
objects.remove(1);
}
reslut = list;
} else {
List<List<Object>> list = pos.get(0).getDataList();
// 不要診斷名字
for (List<Object> objects : list) {
objects.remove(0);
}
reslut = list;
}
} catch (IOException e) {
e.printStackTrace();
}
return reslut;
}
@Override
public void processData(ShardingContext shardingContext, List<List<Object>> list) {
List<Object> objects = list.get(0);
System.out.println(
String.format("------Thread ID: %s, Total number of task slices: %s, current fragmentation items: %s,content : %s", Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), JSONObject.toJSON(objects)
));
}
}
7、作業配置
<job:dataflow id="myDataFlowJob" class="com.liuxl.elastic.MyElasticJob" registry-center-ref=“regCenter" sharding-total-count="2" cron="0/2 * * * * ?" streaming-process="true" overwrite=“true" job-exceptionhandler=“com.liuxl.elastic.exception.MyJobExceptionHandler"/>
七、任務日誌捕捉 elastic-job允許使用者在任務排程異常時指定處理異常的異常處理器,異常處理器由介面JobExceptionHandler定義,定義如下
public interface JobExceptionHandler {
void handleException(String var1, Throwable var2);
}
如果沒有指定自己的異常處理器elastic-job預設將使用DefaultJobExceptionHandler處理異常,其定義如下:
public final class DefaultJobExceptionHandler implements JobExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(DefaultJobExceptionHandler.class);
public DefaultJobExceptionHandler() {
}
public void handleException(String jobName, Throwable cause) {
log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
}
}
以下是一個自定義的異常處理器的示例:
public class MyJobExceptionHandler implements JobExceptionHandler {
private static final Logger logger = Logger.getLogger(MyJobExceptionHandler.class);
@Override
public void handleException(String jobName, Throwable cause) {
logger.error(String.format("任務[%s]排程異常", jobName), cause);
}
}
異常處理器的配置是通過job-exception-handler屬性指定的,所有作業型別的異常處理器的配置是通用的:
<job:dataflow id="myDataFlowJob" class="com.liuxl.elastic.MyElasticJob" registry-center-ref="regCenter"
sharding-total-count="2" cron="0/2 * * * * ?" streaming-process="true" overwrite="true"
job-exception-handler="com.liuxl.elastic.exception.MyJobExceptionHandler"/>