springboot使用Quartz定時器
阿新 • • 發佈:2018-11-29
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
import org.quartz.Scheduler; import org.quartz.spi.TriggerFiredBundle; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.quartz.AdaptableJobFactory; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Component; /** * @author */ @Configuration public class QuartzConfig { @Component public class QuartzJobFactory extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); capableBeanFactory.autowireBean(jobInstance); return jobInstance; } } @Bean public Scheduler scheduler(QuartzJobFactory quartzJobFactory) throws Exception { SchedulerFactoryBean factoryBean = new SchedulerFactoryBean(); factoryBean.setJobFactory(quartzJobFactory); factoryBean.afterPropertiesSet(); Scheduler scheduler = factoryBean.getScheduler(); scheduler.start(); return scheduler; } }
import org.quartz.JobDataMap; import java.io.Serializable; /** * @author */ public class JobDTO implements Serializable { /** * 任務執行時呼叫哪個類的方法 包名+類名,完全限定名 * eg: tech.elt.scheduler.UpdateSearchIndexJob */ private String beanName; /** * cron 表示式 * "0 34 11 * * ? " 每天0點 */ private String cronExpression; /** * 自定義觸發器名 */ private String triggerName; /** * 自定義任務名 */ private String jobName; private String jobStatus; private JobDataMap jobDataMap; /** * 自定義欄位 城市code 為空則更新全部 否則更新以site開頭的資料 */ private String site; public String getBeanName() { return beanName; } public void setBeanName(String beanName) { this.beanName = beanName; } public String getCronExpression() { return cronExpression; } public void setCronExpression(String cronExpression) { this.cronExpression = cronExpression; } public String getTriggerName() { return triggerName; } public void setTriggerName(String triggerName) { this.triggerName = triggerName; } public String getJobName() { return jobName; } public void setJobName(String jobName) { this.jobName = jobName; } public String getJobStatus() { return jobStatus; } public void setJobStatus(String jobStatus) { this.jobStatus = jobStatus; } public JobDataMap getJobDataMap() { return jobDataMap; } public void setJobDataMap(JobDataMap jobDataMap) { this.jobDataMap = jobDataMap; } public String getSite() { return site; } public void setSite(String site) { this.site = site; } @Override public String toString() { return "JobDTO{" + "beanName='" + beanName + '\'' + ", cronExpression='" + cronExpression + '\'' + ", triggerName='" + triggerName + '\'' + ", jobName='" + jobName + '\'' + ", jobStatus='" + jobStatus + '\'' + ", jobDataMap=" + jobDataMap + ", site='" + site + '\'' + '}'; } }
import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import tech.liveeasy.platform.kapok.elt.constant.es.IndexAndTypeConstant; import tech.liveeasy.platform.kapok.elt.constant.es.PropertyIndexMappingConstant; import java.util.List; import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** * 更新search_index的count值 * @author 王 */ @Component public class UpdateSearchIndexJob implements Job { @Autowired private RestHighLevelClient client; @Override public void execute(JobExecutionContext context) { String site = context.getTrigger().getJobDataMap().getString("site"); String id = null; try { SearchRequest request = new SearchRequest(IndexAndTypeConstant.PROPERTY_SQL); request.types(IndexAndTypeConstant.PROPERTY_SQL_TYPE); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.size(0); if (StringUtils.isNotBlank(site)) { BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.prefixQuery("site", site)); searchSourceBuilder.query(boolQueryBuilder); } System.err.println("site=================================" + site); TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("estateIdAgg").field(PropertyIndexMappingConstant.ESTATE_ID); aggregationBuilder.order(BucketOrder.count(false)); //todo aggregationBuilder.size(Integer.MAX_VALUE); searchSourceBuilder.aggregation(aggregationBuilder); request.source(searchSourceBuilder); SearchResponse searchResponse = client.search(request); Map<String, Aggregation> aggMap = searchResponse.getAggregations().getAsMap(); ParsedStringTerms gradeTerms = (ParsedStringTerms) aggMap.get("estateIdAgg"); List list = gradeTerms.getBuckets(); for (Object object : list) { ParsedStringTerms.ParsedBucket obj = (ParsedStringTerms.ParsedBucket) object; String key = obj.getKeyAsString(); long count = obj.getDocCount(); id = key; UpdateRequest updateRequest = new UpdateRequest(IndexAndTypeConstant.SEARCH_INDEX, IndexAndTypeConstant.SEARCH_TYPE, key) .doc(jsonBuilder() .startObject() .field("count", count) .endObject()); client.update(updateRequest); } } catch (Exception e) { System.err.println("本條資料插入失敗 id: " + id); } System.err.println("UpdateSearchIndexJob執行結束!"); } }
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import tech.liveeasy.platform.kapok.elt.constant.JsonResult;
import tech.liveeasy.platform.kapok.elt.constant.ResultCode;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
/**
* @author
*/
@RestController
@RequestMapping("/scheduler")
public class QuartzManageController {
@Autowired
private Scheduler scheduler;
/**
* 更新任務
*
* @param jobDTO 必選引數 triggerName cronExpression
* 可選 site 城市編碼
* @throws SchedulerException
*/
@GetMapping({"/update"})
public JsonResult update(JobDTO jobDTO) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(jobDTO.getTriggerName());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(jobDTO.getCronExpression());
String site = jobDTO.getSite();
trigger = trigger.getTriggerBuilder()
.withIdentity(triggerKey)
.usingJobData("site", site)
.withSchedule(scheduleBuilder)
.build();
try {
scheduler.rescheduleJob(triggerKey, trigger);
} catch (Exception e) {
return new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
}
return new JsonResult(ResultCode.SUCCESS, "成功", jobDTO);
}
/**
* 刪除job
*
* @param jobName
* @throws SchedulerException
*/
@GetMapping({"/delete"})
public JsonResult deleteJob(String jobName) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName);
boolean b = scheduler.deleteJob(jobKey);
return new JsonResult(ResultCode.SUCCESS, "成功", b);
}
/**
* 新加任務
*
* @param job
* @throws SchedulerException
*/
@GetMapping({"/add"})
public JsonResult addJob(JobDTO job) throws SchedulerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
Class<?> clazz = Class.forName(job.getBeanName());
Job jobEntity = (Job) clazz.newInstance();
JobDetail jobDetail = newJob(jobEntity.getClass()).withIdentity(job.getJobName()).build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
String site = job.getSite();
Trigger trigger = newTrigger()
.withIdentity(job.getTriggerName())
.usingJobData("site", site)
.startNow()
.withSchedule(scheduleBuilder)
.build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (ObjectAlreadyExistsException e) {
String result = "Unable to store Job, because one already exists with this identification.";
return new JsonResult(ResultCode.SYS_ERROR, "異常", result);
}
return new JsonResult(ResultCode.SUCCESS, "成功", job);
}
/**
* 檢視計劃中的任務
*/
@GetMapping({"/plans"})
public JsonResult planJobs() {
List<JobDTO> jobList = new ArrayList<>();
try {
GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
for (JobKey jobKey : jobKeys) {
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
JobDTO job = new JobDTO();
job.setJobName(jobKey.getName());
job.setTriggerName(trigger.getKey().getName());
job.setJobDataMap(trigger.getJobDataMap());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}
}
} catch (Exception e) {
new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
}
return new JsonResult(ResultCode.SUCCESS, "成功", jobList);
}
/**
* 檢視執行中的任務
*/
@GetMapping({"/runs"})
public JsonResult runJobs() {
try {
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
List<JobDTO> jobList = new ArrayList<>(executingJobs.size());
for (JobExecutionContext executingJob : executingJobs) {
JobDTO job = new JobDTO();
JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Trigger trigger = executingJob.getTrigger();
job.setJobName(jobKey.getName());
job.setTriggerName(trigger.getKey().getName());
job.setJobDataMap(trigger.getJobDataMap());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}
return new JsonResult(ResultCode.SUCCESS, "成功", jobList);
} catch (Exception e) {
new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
}
return new JsonResult();
}
/**
* 立即執行job
*
* @param jobName
*/
@GetMapping({"/runNow"})
public JsonResult runAJobNow(String jobName) {
JobKey jobKey = JobKey.jobKey(jobName);
try {
scheduler.triggerJob(jobKey);
} catch (Exception e) {
return new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
}
return new JsonResult(ResultCode.SUCCESS, "成功");
}
/**
* 暫停job
*
* @param jobName
* @Description 和恢復job相對
*/
@GetMapping({"/pause"})
public JsonResult pauseJob(String jobName) {
JobKey jobKey = JobKey.jobKey(jobName);
try {
scheduler.pauseJob(jobKey);
} catch (Exception e) {
return new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
}
return new JsonResult(ResultCode.SUCCESS, "成功");
}
/**
* 恢復job
*
* @param jobName
* @Description 和暫停任務相對
*/
@GetMapping({"/resume"})
public JsonResult resumeJob(String jobName) {
JobKey jobKey = JobKey.jobKey(jobName);
try {
scheduler.resumeJob(jobKey);
} catch (Exception e) {
return new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
}
return new JsonResult(ResultCode.SUCCESS, "成功");
}
}