SpringBoot整合Elastic-Job,實現動態建立定時任務,任務持久化
SpringBoot使用Elastic-Job-lite,實現動態建立定時任務,任務持久化
Elastic-Job是噹噹開源的一個分散式排程解決方案,由兩個相互獨立的子專案Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務;Elastic-Job-Cloud採用自研Mesos Framework的解決方案,額外提供資源治理、應用分發以及程序隔離等功能。
這裡以Elastic-Job-lite為例,跟SpringBoot進行整合,噹噹的官方文件中並沒有對SpringBoot整合作說明,所有的配置都是基於文件中的xml的配置修改出來的。
起步
準備好一個SpringBoot的專案,pom.xml中引入Elastic-job,mysql,jpa等依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId >elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
</dependencies>
配置
使用yaml進行相關屬性的配置,主要配置的是資料庫連線池,jpa
elasticjob:
serverlists: 172.31.31.48:2181
namespace: boot-job
spring:
datasource:
url: jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
driver-class-name: com.mysql.jdbc.Driver
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource
# 自動建立更新驗證資料庫結構
jpa:
hibernate:
ddl-auto: update
show-sql: true
database: mysql
elastic-job相關的配置使用java配置實現,代替官方文件的xml配置
@Configuration
@Data
@ConfigurationProperties(prefix = "elasticjob")
public class ElasticJobConfig {
private String serverlists;
private String namespace;
@Resource
private HikariDataSource dataSource;
@Bean
public ZookeeperConfiguration zkConfig() {
return new ZookeeperConfiguration(serverlists, namespace);
}
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
return new ZookeeperRegistryCenter(config);
}
/**
* 將作業執行的痕跡進行持久化到DB
*
* @return
*/
@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
@Bean
public ElasticJobListener elasticJobListener() {
return new ElasticJobListener(100, 100);
}
}
所有相關的配置到這裡就已經OK了,接下來開始具體的編碼實現
定時任務實現
先實現一個自己的任務類,需要實現elastic-job提供的SimpleJob介面,實現它的execute(ShardingContext shardingContext)方法
@Slf4j
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
//打印出任務相關資訊,JobParameter用於傳遞任務的ID
log.info("任務名:{}, 片數:{}, id={}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
shardingContext.getJobParameter());
}
}
接下來實現一個分散式的任務監聽器,如果任務有分片,分散式監聽器會在總的任務開始前執行一次,結束時執行一次。監聽器在之前的ElasticJobConfig已經註冊到了Spring容器之中。
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
@Resource
private TaskRepository taskRepository;
public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
}
@Override
public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
}
@Override
public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
//任務執行完成後更新狀態為已執行
JobTask jobTask = taskRepository.findOne(Long.valueOf(shardingContexts.getJobParameter()));
jobTask.setStatus(1);
taskRepository.save(jobTask);
}
}
實現一個ElasticJobHandler,用於向Elastic-job中新增指定的作業配置,作業配置分為3級,分別是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,層層巢狀。
@Component
public class ElasticJobHandler {
@Resource
private ZookeeperRegistryCenter registryCenter;
@Resource
private JobEventConfiguration jobEventConfiguration;
@Resource
private ElasticJobListener elasticJobListener;
/**
* @param jobName
* @param jobClass
* @param shardingTotalCount
* @param cron
* @param id 資料ID
* @return
*/
private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName,
Class<? extends SimpleJob> jobClass,
int shardingTotalCount,
String cron,
String id) {
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).jobParameter(id).build(), jobClass.getCanonicalName()));
}
/**
* 新增一個定時任務
*
* @param jobName 任務名
* @param cron 表示式
* @param shardingTotalCount 分片數
*/
public void addJob(String jobName, String cron, Integer shardingTotalCount, String id) {
LiteJobConfiguration jobConfig = simpleJobConfigBuilder(jobName, MyElasticJob.class, shardingTotalCount, cron, id)
.overwrite(true).build();
new SpringJobScheduler(new MyElasticJob(), registryCenter, jobConfig, jobEventConfiguration, elasticJobListener).init();
}
}
到這裡,elastic-job的註冊中心,資料來源相關配置,以及動態新增的邏輯已經做完了,接下來在service中呼叫上面寫好的方法,驗證功能是否正常。
編寫一個ElasticJobService類,掃描資料庫中狀態為0的任務,並且把這些任務新增到Elastic-job中,這裡的相關資料庫操作使用了spring-data-jpa,dao層相關程式碼就不貼了,可以在原始碼中檢視。
@Service
public class ElasticJobService {
@Resource
private ElasticJobHandler jobHandler;
@Resource
private TaskRepository taskRepository;
/**
* 掃描db,並新增任務
*/
public void scanAddJob() {
Specification query = (Specification<JobTask>) (root, criteriaQuery, criteriaBuilder) -> criteriaBuilder
.and(criteriaBuilder.equal(root.get("status"), 0));
List<JobTask> jobTasks = taskRepository.findAll(query);
jobTasks.forEach(jobTask -> {
Long current = System.currentTimeMillis();
String jobName = "job" + jobTask.getSendTime();
String cron;
//說明消費未傳送,但是已經過了訊息的傳送時間,調整時間繼續執行任務
if (jobTask.getSendTime() < current) {
//設定為一分鐘之後執行,把Date轉換為cron表示式
cron = CronUtils.getCron(new Date(current + 60000));
} else {
cron = CronUtils.getCron(new Date(jobTask.getSendTime()));
}
jobHandler.addJob(jobName, cron, 1, String.valueOf(jobTask.getId()));
});
}
}
在Junit中新增幾條測試資料
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class JobTaskTest {
@Resource
private TaskRepository taskRepository;
@Test
public void add() {
//生成幾個任務,第一任務在三分鐘之後
Long unixTime = System.currentTimeMillis() + 60000;
JobTask task = new JobTask("test-msg-1", 0, unixTime);
taskRepository.save(task);
unixTime += 60000;
task = new JobTask("test-msg-2", 0, unixTime);
taskRepository.save(task);
unixTime += 60000;
task = new JobTask("test-msg-3", 0, unixTime);
taskRepository.save(task);
unixTime += 60000;
task = new JobTask("test-msg-4", 0, unixTime);
taskRepository.save(task);
}
}
此時,資料庫中多了四條狀態為0的資料
最後,就可以開始驗證整個流程了,程式碼如下
@SpringBootApplication
public class ElasticJobApplication implements CommandLineRunner {
@Resource
private ElasticJobService elasticJobService;
public static void main(String[] args) {
SpringApplication.run(ElasticJobApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
elasticJobService.scanAddJob();
}
}
可以看到,在啟動過程中,多個任務被加入到了Elastic-job中,並且一小段時間之後,任務一次執行,執行成功之後,因為我們配置了監聽器,會列印資料庫的更新SQL,當任務執行完成,再檢視資料庫,發現狀態也更改成功。資料庫中同時也會多出兩張表JOB_EXECUTION_LOG,JOB_STATUS_TRACE_LOG,這是我們之前配置的JobEventConfiguration,通過資料來源持久化了作業配置的相關資料,這兩張表的資料可以供Elastic-job提供的運維平臺使用,具體請檢視官方文件。
總結
至此,整個流程就已經走完了,整個demo中主要用到了Elastic-job和spring-data-jpa相關的技術,作為demo,肯定會有一些缺陷,沒考慮到的地方,可以根據自己的業務場景進行改進。
最後,附上github原始碼,歡迎star,一起交流。上面涉及到的資料庫,請自行建立,表會自動生成。原始碼地址