Elastic-Job:動態新增任務,支援動態分片
阿新 • • 發佈:2019-09-19
多情只有春庭月,猶為離人照落花。
概述
因專案中使用到定時任務,且服務部署多例項,因此需要解決定時任務重複執行的問題。即在同一時間點,每一個定時任務只在一個節點上執行。常見的開源方案,如 elastic-job
、 xxl-job
、quartz
、 saturn
、 opencron
、 antares
等。最終決定使用elastic-job
。elastic-job
的亮點主要如下:
- 基於quartz 定時任務框架為基礎的,因此具備quartz的大部分功能
- 使用zookeeper做協調,排程中心,更加輕量級
- 支援任務的分片
- 支援彈性擴容 , 可以水平擴充套件 , 當任務再次執行時,會檢查當前的伺服器數量,重新分片,分片結束之後才會繼續執行任務
- 失效轉移,容錯處理,當一臺排程伺服器宕機或者跟zookeeper斷開連線之後,會立即停止作業,然後再去尋找其他空閒的排程伺服器,來執行剩餘的任務
- 提供運維介面,可以管理作業和註冊中心
但在實際開發中發現elastic-job
對於動態新增的定時任務不支援分片。即在多例項情況下,在某個例項上動態新增任務,則該任務會一直在這一臺節點上執行。如果需要在其它例項上執行,則需要以相同的引數呼叫其它例項介面。參考:elastic-job:動態進行任務的新增。在多次百度+google
下發現Elastic-Job動態新增任務這裡與樓主遇到了相同的問題。但經樓主測試動態新增任務的分片時好時壞,且只要在zookeeper
解決
順著尹大的思路,將任務的節點都集中管理起來,無論動態任務在哪個節點上進行註冊,都需要將這個請求轉發到其他的節點上進行初始化操作,這樣就可以保證多節點分片的任務正常執行。
程式碼如下:
/** * 開啟任務監聽,當有任務新增時,監聽zk中的資料增加,自動在其他節點也初始化該任務 */ public void monitorJobRegister() { CuratorFramework client = zookeeperRegistryCenter.getClient(); @SuppressWarnings("resource") PathChildrenCache childrenCache = new PathChildrenCache(client, "/", true); PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: String config = new String(client.getData().forPath(data.getPath() + "/config")); Job job = JsonUtils.toBean(Job.class, config); Object bean = null; // 獲取bean失敗則新增任務 try { bean = ctx.getBean("SpringJobScheduler" + job.getJobName()); } catch (BeansException e) { logger.error("ERROR NO BEAN,CREATE BEAN SpringJobScheduler" + job.getJobName()); } if (Objects.isNull(bean)) { addJob(job); } break; default: break; } } }; childrenCache.getListenable().addListener(childrenCacheListener); try { // https://blog.csdn.net/u010402202/article/details/79581575 childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { e.printStackTrace(); } }
測試
測試動態新增定時任務,支援分片失效轉移。
- 下載elastic-job-spring-boot-starter 使用
maven
命令install
到本地 - 建立
demo-elastic-job
專案
目錄結構如下:
demo-elastic-job
├── mvnw
├── mvnw.cmd
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ └── demo
│ │ │ ├── job
│ │ │ │ ├── DynamicJob.java
│ │ │ │ └── TestJob.java
│ │ │ └── DemoApplication.java
│ │ └── resources
│ │ ├── application.yml
│ │ └── application-dev.yml
│ └── test
│ └── java
│ └── com
│ └── example
│ └── demo
│ └── DemoApplicationTests.java
├── pom.xml
└── demo.iml
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.cxytiandi</groupId>
<artifactId>elastic-job-spring-boot-starter</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
DemoApplication.java
package com.example.demo;
import com.cxytiandi.elasticjob.annotation.EnableElasticJob;
import com.cxytiandi.elasticjob.dynamic.service.JobService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@EnableElasticJob
@ComponentScan(basePackages = {"com.cxytiandi", "com.example.demo"})
public class DemoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Autowired
private JobService jobService;
@Override
public void run(String... args) throws Exception {
// 模擬初始化讀取資料庫 新增任務
// Job job1 = new Job();
// job1.setJobName("job1");
// job1.setCron("0/10 * * * * ? ");
// job1.setJobType("SIMPLE");
// job1.setJobClass("com.example.demo.job.DynamicJob");
// job1.setShardingItemParameters("");
// job1.setShardingTotalCount(2);
// jobService.addJob(job1);
// Job job2 = new Job();
// job2.setJobName("job2");
// job2.setCron("0/10 * * * * ? ");
// job2.setJobType("SIMPLE");
// job2.setJobClass("com.example.demo.job.DynamicJob");
// job2.setShardingItemParameters("0=A,1=B");
// job2.setShardingTotalCount(2);
// jobService.addJob(job2);
}
}
TestJob.java
package com.example.demo.job;
import com.cxytiandi.elasticjob.annotation.ElasticJobConf;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
/**
* Created by zhenglongfei on 2019/7/22
*
* @VERSION 1.0
*/
@Component
@Slf4j
@ElasticJobConf(name = "dayJob", cron = "0/10 * * * * ?", shardingTotalCount = 2,
shardingItemParameters = "0=AAAA,1=BBBB", description = "簡單任務", failover = true)
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("TestJob任務名:【{}】, 片數:【{}】, param=【{}】", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
shardingContext.getShardingParameter());
}
}
DynamicJob.java
package com.example.demo.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* Created by zhenglongfei on 2019/7/24
*
* @VERSION 1.0
*/
@Component
@Slf4j
public class DynamicJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()) {
case 0:
log.info("【0】 is running");
break;
case 1:
log.info("【1】 is running");
break;
}
}
}
application.yml
elastic:
job:
zk:
serverLists: 172.25.66.137:2181
namespace: demo_test
server:
port: 8082
spring:
redis:
host: 127.0.0.1
port: 6379
測試結果
啟動兩個專案分別為8081
和8082
埠,使用REST API
來動態的註冊任務。
- job
http://localhost:8081/job post
引數如下:
{
"jobName": "DynamicJob01",
"cron": "0/3 * * * * ?",
"jobType": "SIMPLE",
"jobClass": "com.example.demo.job.DynamicJob",
"jobParameter": "test",
"shardingTotalCount": 2,
"shardingItemParameters": "0=AAAA,1=BBBB"
}
程式碼下載
- github:demo-elastic-job
- github:elastic-job-spring-boot-starter