Elastic Job入門(1) - 簡介
1、如果業務工程採用 叢集化的部署,可能會多次重複執行定時任務而導致系統的業務邏輯錯誤,併產生系統故障。
2、Quartz的叢集方案具備HA功能,可以實現定時任務的分發,但是通過增加機器節點數量的方式並不能提高每次定時任務的執行效率,無法實現任務的 彈性分片。
開源產品Elastic-Job是噹噹開源的一款分散式彈性定時任務排程框架,在2.X版本以後主要分為Elastic-Job-Lite
GitHub地址為:https://github.com/elasticjob/elastic-job-lite
中文官網為:http://elasticjob.io/index_zh.html
從Elastic-Job的架構圖上基本就可以看出,其以Jar的形式為業務工程(諸如,Spring Boot工程)的快速整合提供了簡便的方式。同時,其提供的定時任務分片、彈性擴縮容、失效轉移、作業監控和支援多種作業模式等強大的功能,使業務開發人員 無需在這些方面花費較大多的精力,而可以更加專注於平臺的業務開發
1、 定時任務:基於成熟的定時任務作業框架Quartz cron表示式執行定時任務;
2、 作業註冊中心:基於Zookeeper和其客戶端Curator實現的全域性作業註冊控制中心;作業註冊中心僅用於作業任務註冊和監控資訊的暫存;
3、 定時任務分片:可以將原本一個較大任務分片成為多小的子任務項分別在多個伺服器上同時執行,提高總任務的執行處理效率;
4、 彈性擴容縮容:執行中定時任務所在的伺服器崩潰,或新增加n臺作業伺服器,作業框架將在下次任務執行前重新進行任務排程分發,不影響當前任務的處理與執行;
5、 支援多種任務模式:分別支援Simple、Dataflow和Script型別的定時任務;
6、 失效轉移
7、 執行時定時任務狀態收集:監控任務執行時的狀態,統計最近一段時間任務處理成功和失敗的數量,記錄作業上次執行開始時間,結束時間和下次執行時間;
8、 支援配置定時任務停止、恢復和禁用:用於操作定時任務的啟停,並可以禁止某任務的執行;
9、 Spring支援:Elastic-Job-Lite專案完美支援spring的容器,自定義名稱空間,支援佔位符
10、 運維平臺:提供運維介面,方便開發和運維人員管理生產環境上已經發布的定時任務和註冊中心; 水平分片 橫向拓容 Springboot2預設資料庫連線池選擇了HikariCP
<dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> </dependency>
spring:
datasource:
url: jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
driver-class-name: com.mysql.jdbc.Driver
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource
# 自動建立更新驗證資料庫結構
jpa:
hibernate:
ddl-auto: update
show-sql: true
database: mysql
==========================================API配置啟動==========================================
引入Maven
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>${latest.release.version}</version> </dependency>
方法引數ShardingContext包含作業配置,分片和執行時資訊。可通過getShardingTotalCount(),getShardingItems()等方法分別獲取分片總數,執行在本作業伺服器的分片序列號集合等。
Simple型別作業
與Quartz介面相似,用於執行普通的定時任務,只是增加了彈性拓容和分片等功能:
public class MyElasticJob implements SimpleJob { @Override public void execute(ShardingContext context) { switch (context.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }
Dataflow型別作業
Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。
public class MyElasticJob implements DataflowJob<Foo> { @Override public List<Foo> fetchData(ShardingContext context) { switch (context.getShardingItem()) { case 0: List<Foo> data = // get data from database by sharding item 0 return data; case 1: List<Foo> data = // get data from database by sharding item 1 return data; case 2: List<Foo> data = // get data from database by sharding item 2 return data; // case n: ... } } @Override public void processData(ShardingContext shardingContext, List<Foo> data) { // process data // ... } }
流式處理
可通過DataflowJobConfiguration配置是否流式處理。流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; 非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。如果採用流式作業處理方式,建議processData處理資料後更新其狀態,避免fetchData再次抓取到,從而使得作業永不停止。 流式資料處理參照TbSchedule設計,適用於不間歇的資料處理。
作業配置
Elastic-Job配置分為3個層級,分別是Core, Type和Root。每個層級使用相似於裝飾者模式的方式裝配。
Core對應JobCoreConfiguration,用於提供作業核心配置資訊,如:作業名稱、分片總數、CRON表示式等。
Type對應JobTypeConfiguration,有3個子類分別對應SIMPLE, DATAFLOW和SCRIPT型別作業,提供3種作業需要的不同配置,如:DATAFLOW型別是否流式處理或SCRIPT型別的命令列等。
Root對應JobRootConfiguration,有2個子類分別對應Lite和Cloud部署型別,提供不同部署型別所需的配置,如:Lite型別的是否需要覆蓋本地配置或Cloud佔用CPU或Memory數量等。
// 定義作業核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build(); // 定義SIMPLE型別配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName()); // 定義Lite作業根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
作業啟動
public class JobDemo { public static void main(String[] args) { new JobScheduler(createRegistryCenter(), createJobConfiguration()).init(); } private static CoordinatorRegistryCenter createRegistryCenter() { CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo")); regCenter.init(); return regCenter; } private static LiteJobConfiguration createJobConfiguration() { // 建立作業配置 // ... } }
==========================================Spring配置啟動==========================================
引入Maven
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>${latest.release.version}</version> </dependency>
作業配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "> <!--配置作業註冊中心 --> <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /> <!-- 配置作業--> <job:simple id="demoSimpleSpringJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> </beans>
作業啟動
將配置Spring名稱空間的xml通過Spring啟動,作業將自動載入。