SpringBoot基礎教程3-1-3 Quartz定時任務單點持久化
阿新 • • 發佈:2018-12-16
1 概述
實際專案中,複雜的定時任務都會結合持久化,動態改變定時任務狀態,本文將介紹基於Quartz
的定時任務單點持久化方式,通過RESTful
風格,演示定時任務的CRUD
,最後使用Swagger
測試。
2 資料庫表說明
//Quartz表 qrtz_calendars:以 Blob 型別儲存 Quartz 的 Calendar 資訊 qrtz_cron_triggers:儲存 Cron Trigger,包括 Cron 表示式和時區資訊 qrtz_fired_triggers:儲存與已觸發的 Trigger 相關的狀態資訊,以及相聯 Job 的執行資訊 qrtz_paused_trigger_grps:儲存已暫停的 Trigger 組的資訊 qrtz_scheduler_state:儲存少量的有關排程器 (Scheduler) 的狀態,和別的 排程器 (Scheduler)例項(假如是用於一個叢集中) qrtz_locks:儲程式的非觀鎖的資訊(假如使用了悲觀鎖) qrtz_job_details:儲存每一個已配置的 Job 的詳細資訊(jobDetail) qrtz_job_listeners:儲存有關已配置的 Job 監聽器 的資訊 qrtz_simple_triggers:儲存簡單的 Trigger,包括重複次數,間隔,以及已觸的次數 qrtz_blog_triggers:以 Blob 型別儲存的Trigger(用於 Quartz 使用者用 JDBC 建立他們自己定製的 Trigger 型別,JobStore 並不知道如何儲存例項的時候) qrtz_trigger_listeners:儲存已配置的觸發器監聽器 ( Trigger Listener ) 的資訊 qrtz_triggers:儲存已配置的 觸發器 (Trigger) 的資訊 //新建表 ScheduleJob:自定義定時任務詳細狀態表,方便管理定時任務
- 建表指令碼:
/resourecs/quartz.sql
3 新增依賴
<!--quartz相關依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency> <!--資料庫相關依賴--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.9</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.2</version> </dependency>
4 新增配置
#資料庫連線池配置 spring: datasource: name: mysql_test type: com.alibaba.druid.pool.DruidDataSource #druid相關配置 druid: #監控統計攔截的filters filters: stat driver-class-name: com.mysql.jdbc.Driver #基本屬性 url: jdbc:mysql://127.0.0.1:3306/springboot?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true username: root password: 123456 #配置初始化大小/最小/最大 initial-size: 1 min-idle: 1 max-active: 20 #獲取連線等待超時時間 max-wait: 60000 #間隔多久進行一次檢測,檢測需要關閉的空閒連線 time-between-eviction-runs-millis: 60000 #一個連線在池中最小生存的時間 min-evictable-idle-time-millis: 300000 validation-query: SELECT 'x' test-while-idle: true test-on-borrow: false test-on-return: false #開啟PSCache,並指定每個連線上PSCache的大小。oracle設為true,mysql設為false。分庫分表較多推薦設定為false pool-prepared-statements: false max-pool-prepared-statement-per-connection-size: 20 #Quartz配置 quartz: jdbc: initialize-schema: always job-store-type: jdbc ##Mybatis配置 mybatis: #Mapper.xml所在的位置 mapper-locations: classpath:mapping/*.xml #entity掃描的包名 type-aliases-package: com.mkeeper.entity
5 Spring
接管Quartz
@Component
public class ScheduleJobFactory extends AdaptableJobFactory {
// 讓不受spring管理的類具有spring自動注入的特性
@Autowired
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
autowireCapableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
6 Quartz
配置SchedulerFactoryBean
初始化
@Configuration
public class ScheduleConfig {
@Autowired
private ScheduleJobFactory scheduleJobFactory;
@Bean
@Qualifier("scheduleBean")
public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("dataSource") DataSource dataSource) {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
// 名稱
schedulerFactoryBean.setSchedulerName("TASK_EXECUTOR");
// 延遲10秒啟動Scheduler
schedulerFactoryBean.setStartupDelay(10);
// 通過applicationContextSchedulerContextKey屬性配置spring上下文
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContextKey");
// 設定是否任意一個已定義的Job會覆蓋現有的Job。預設為false,即已定義的Job不會覆蓋現有的Job。
schedulerFactoryBean.setOverwriteExistingJobs(true);
// 自動開始
schedulerFactoryBean.setAutoStartup(true);
// 資料來源
schedulerFactoryBean.setDataSource(dataSource);
// 將JobFactory改為自定義的,否則在 Job 中注入 Bean 會失敗
schedulerFactoryBean.setJobFactory(scheduleJobFactory);
return schedulerFactoryBean;
}
}
7 自定義任務管理
實體
@Data
public class ScheduleJob implements Serializable {
private static final Long serialVersionUID = 1435515995276255188L;
private Long id;
private String className;
private String cronExpression;
private String jobName;
private String jobGroup;
private String triggerName;
private String triggerGroup;
private Boolean pause;
private Boolean enable;
private String description;
private Date createTime;
private Date lastUpdateTime;
}
為了節約篇幅,mapping,dao省略,請參考原始碼
9 建立Quartz
任務排程工具類(重點)
@Slf4j
public class ScheduleUtil {
/**
* 獲取 Trigger Key
*
* @param scheduleJob
* @return
*/
public static TriggerKey getTriggerKey(ScheduleJob scheduleJob) {
return TriggerKey.triggerKey(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup());
}
/**
* 獲取 Job Key
*
* @param scheduleJob
* @return
*/
public static JobKey getJobKey(ScheduleJob scheduleJob) {
return JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
}
/**
* 獲取 Cron Trigger
*
* @param scheduler
* @param scheduleJob
* @return
* @throws ServiceException
*/
public static CronTrigger getCronTrigger(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
try {
return (CronTrigger) scheduler.getTrigger(getTriggerKey(scheduleJob));
} catch (SchedulerException e) {
throw new ServiceException("Get Cron trigger failed", e);
}
}
/**
* 建立任務
*
* @param scheduler
* @param scheduleJob
* @throws ServiceException
*/
public static void createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
validateCronExpression(scheduleJob);
try {
// 要執行的 Job 的類
Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(scheduleJob.getClassName()).newInstance().getClass();
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(scheduleJob.getJobName(), scheduleJob.getJobGroup())
.withDescription(scheduleJob.getDescription())
.build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
.withMisfireHandlingInstructionDoNothing();
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup())
.withDescription(scheduleJob.getDescription())
.withSchedule(scheduleBuilder)
.startNow()
.build();
scheduler.scheduleJob(jobDetail, cronTrigger);
log.info("Create schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
if (scheduleJob.getPause()) {
pauseJob(scheduler, scheduleJob);
}
} catch (Exception e) {
e.printStackTrace();
log.error("Execute schedule job failed");
throw new ServiceException("Execute schedule job failed", e);
}
}
/**
* 更新任務
*
* @param scheduler
* @param scheduleJob
* @throws ServiceException
*/
public static void updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
validateCronExpression(scheduleJob);
try {
TriggerKey triggerKey = getTriggerKey(scheduleJob);
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
.withMisfireHandlingInstructionDoNothing();
CronTrigger cronTrigger = getCronTrigger(scheduler, scheduleJob);
cronTrigger = cronTrigger.getTriggerBuilder()
.withIdentity(triggerKey)
.withDescription(scheduleJob.getDescription())
.withSchedule(cronScheduleBuilder).build();
scheduler.rescheduleJob(triggerKey, cronTrigger);
log.info("Update schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
if (scheduleJob.getPause()) {
pauseJob(scheduler, scheduleJob);
}
} catch (SchedulerException e) {
e.printStackTrace();
log.error("Update schedule job failed");
throw new ServiceException("Update schedule job failed", e);
}
}
/**
* 執行任務
*
* @param scheduler
* @param scheduleJob
* @throws ServiceException
*/
public static void run(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
try {
scheduler.triggerJob(getJobKey(scheduleJob));
log.info("Run schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
} catch (SchedulerException e) {
e.printStackTrace();
log.error("Run schedule job failed");
throw new ServiceException("Run schedule job failed", e);
}
}
/**
* 暫停任務
*
* @param scheduler
* @param scheduleJob
*/
public static void pauseJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
try {
scheduler.pauseJob(getJobKey(scheduleJob));
log.info("Pause schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
} catch (SchedulerException e) {
e.printStackTrace();
log.error("Pause schedule job failed");
throw new ServiceException("Pause job failed", e);
}
}
/**
* 繼續執行任務
*
* @param scheduler
* @param scheduleJob
* @throws ServiceException
*/
public static void resumeJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
try {
scheduler.resumeJob(getJobKey(scheduleJob));
log.info("Resume schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
} catch (SchedulerException e) {
e.printStackTrace();
log.error("Resume schedule job failed");
throw new ServiceException("Resume job failed", e);
}
}
/**
* 刪除任務
*
* @param scheduler
* @param scheduleJob
* @throws ServiceException
*/
public static void deleteJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
try {
scheduler.deleteJob(getJobKey(scheduleJob));
log.info("Delete schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
} catch (SchedulerException e) {
e.printStackTrace();
log.error("Delete schedule job failed");
throw new ServiceException("Delete job failed", e);
}
}
/**
* 校驗Cron表示式
*/
public static void validateCronExpression(ScheduleJob scheduleJob) throws ServiceException {
if (!CronExpression.isValidExpression(scheduleJob.getCronExpression())) {
throw new ServiceException(String.format("Job %s expression %s is not correct!", scheduleJob.getClassName(), scheduleJob.getCronExpression()));
}
}
}
10 建立定時任務服務類
@Service
public class JobService {
@Resource
private JobMapper jobMapper;
@Resource
private Scheduler scheduler;
public List<ScheduleJob> getAllEnableJob() {
return jobMapper.getAllEnableJob();
}
public ScheduleJob select(Long jobId) throws ServiceException {
ScheduleJob scheduleJob = jobMapper.select(jobId);
if (scheduleJob == null) {
throw new ServiceException("ScheduleJob:" + jobId + " not found");
}
return scheduleJob;
}
@Transactional(rollbackFor = DataAccessException.class)
public ScheduleJob update(Long jobId, ScheduleJob scheduleJob) throws ServiceException {
if (jobMapper.update(scheduleJob) <= 0) {
throw new ServiceException("Update product:" + jobId + "failed");
}
ScheduleUtil.updateScheduleJob(scheduler, scheduleJob);
return scheduleJob;
}
@Transactional(rollbackFor = DataAccessException.class)
public boolean add(ScheduleJob scheduleJob) throws ServiceException {
Integer num = jobMapper.insert(scheduleJob);
if (num <= 0) {
throw new ServiceException("Add product failed");
}
ScheduleUtil.createScheduleJob(scheduler, scheduleJob);
return true;
}
@Transactional(rollbackFor = DataAccessException.class)
public boolean delete(Long jobId) throws ServiceException {
ScheduleJob scheduleJob = select(jobId);
Integer num = jobMapper.delete(jobId);
if (num <= 0) {
throw new ServiceException("Delete product:" + jobId + "failed");
}
ScheduleUtil.deleteJob(scheduler, scheduleJob);
return true;
}
public List<ScheduleJob> getAllJob() {
return jobMapper.getAllJob();
}
public boolean resume(Long jobId) throws ServiceException {
ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false);
ScheduleUtil.resumeJob(scheduler, scheduleJob);
return true;
}
public boolean pause(Long jobId) throws ServiceException {
ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, true);
ScheduleUtil.pauseJob(scheduler, scheduleJob);
return true;
}
public boolean run(Long jobId) throws ServiceException {
ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false);
ScheduleUtil.run(scheduler, scheduleJob);
return true;
}
private ScheduleJob updateScheduleJobStatus(Long jobId, Boolean isPause) throws ServiceException {
ScheduleJob scheduleJob = select(jobId);
scheduleJob.setPause(isPause);
update(scheduleJob.getId(), scheduleJob);
return scheduleJob;
}
}
11 建立應用啟動監聽類
/**
* 啟動應用時執行定時任務
*
* @author mkeeper
* @create 2018/10/19 10:05
*/
@Slf4j
@Component
public class ApplicationListener implements CommandLineRunner {
@Resource
private JobService jobService;
@Resource
private Scheduler scheduler;
@Override
public void run(String... args) {
List<ScheduleJob> scheduleJobList = jobService.getAllEnableJob();
for (ScheduleJob scheduleJob : scheduleJobList) {
try {
CronTrigger cronTrigger = ScheduleUtil.getCronTrigger(scheduler, scheduleJob);
if (cronTrigger == null) {
ScheduleUtil.createScheduleJob(scheduler, scheduleJob);
} else {
ScheduleUtil.updateScheduleJob(scheduler, scheduleJob);
}
log.info("Startup {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
} catch (ServiceException e) {
log.error("Job ERROR", e);
}
}
}
}
12 新建任務
@Slf4j
@Component
public class TestJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext){
// Do what you want here
log.info("Test job is executing at: " + System.currentTimeMillis()/1000);
}
}
13 Controller
@RestController
@RequestMapping("/job")
public class JobController {
@Autowired
private JobService jobService;
@GetMapping
public R getAllJob() {
return R.isOk().data(jobService.getAllJob());
}
@GetMapping("/{id}")
public R getJob(@PathVariable("id") Long jobId) throws ServiceException {
return R.isOk().data(jobService.select(jobId));
}
@PutMapping("/update/{id}")
public R updateJob(@PathVariable("id") Long jobId, @RequestBody ScheduleJob newScheduleJob) throws ServiceException {
return R.isOk().data(jobService.update(jobId, newScheduleJob));
}
@DeleteMapping("/delete/{id}")
public R deleteJob(@PathVariable("id") Long jobId) throws ServiceException {
return R.isOk().data(jobService.delete(jobId));
}
@PostMapping("/add")
public R saveJob(@RequestBody ScheduleJob newScheduleJob) throws ServiceException {
return R.isOk().data(jobService.add(newScheduleJob));
}
@GetMapping("/run/{id}")
public R runJob(@PathVariable("id") Long jobId) throws ServiceException {
return R.isOk().data(jobService.run(jobId));
}
@GetMapping("/pause/{id}")
public R pauseJob(@PathVariable("id") Long jobId) throws ServiceException {
return R.isOk().data(jobService.pause(jobId));
}
@GetMapping("/resume/{id}")
public R resumeJob(@PathVariable("id") Long jobId) throws ServiceException {
return R.isOk().data(jobService.resume(jobId));
}
}
14 測試結果
考慮到要測試的介面很多,這裡推薦
Swagger
Swagger
是一個規範和完整的框架,用於生成、描述、呼叫和視覺化RESTful
風格的Web
服務
新增依賴
<!--swagger2-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.6.1</version>
</dependency>
新增配置檔案
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Value("${swagger.enable:false}")
private boolean enable;
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.enable(enable)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.mkeeper.controller"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("Quartz定時任務單點持久化介面文件")
.description("Quartz定時任務單點持久化")
.version("1.0")
.build();
}
}
application.yml
中開啟Swagger
swagger:
enable: true
15 工程目錄
16 結束語
說點什麼呢,有任何建議,歡迎留言探討,本文原始碼。
歡迎關注博主公眾號,第一時間推送最新文章