1. 程式人生 > >springboot使用Quartz定時器

springboot使用Quartz定時器

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
import org.quartz.Scheduler;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;

/**
 * @author 
 */
@Configuration
public class QuartzConfig {

    @Component
    public class QuartzJobFactory extends AdaptableJobFactory {

        @Autowired
        private AutowireCapableBeanFactory capableBeanFactory;

        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            capableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }

    @Bean
    public Scheduler scheduler(QuartzJobFactory quartzJobFactory) throws Exception {
        SchedulerFactoryBean factoryBean = new SchedulerFactoryBean();
        factoryBean.setJobFactory(quartzJobFactory);
        factoryBean.afterPropertiesSet();
        Scheduler scheduler = factoryBean.getScheduler();
        scheduler.start();
        return scheduler;
    }
}
import org.quartz.JobDataMap;

import java.io.Serializable;

/**
 * @author  
 */
public class JobDTO implements Serializable {

    /**
     * 任務執行時呼叫哪個類的方法 包名+類名,完全限定名
     * eg: tech.elt.scheduler.UpdateSearchIndexJob
     */
    private String beanName;

    /**
     * cron 表示式
     * "0 34 11 * * ? "  每天0點
     */
    private String cronExpression;

    /**
     * 自定義觸發器名
     */
    private String triggerName;

    /**
     * 自定義任務名
     */
    private String jobName;

    private String jobStatus;

    private JobDataMap jobDataMap;

    /**
     * 自定義欄位  城市code  為空則更新全部 否則更新以site開頭的資料
     */
    private String site;

    public String getBeanName() {
        return beanName;
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    public String getCronExpression() {
        return cronExpression;
    }

    public void setCronExpression(String cronExpression) {
        this.cronExpression = cronExpression;
    }

    public String getTriggerName() {
        return triggerName;
    }

    public void setTriggerName(String triggerName) {
        this.triggerName = triggerName;
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getJobStatus() {
        return jobStatus;
    }

    public void setJobStatus(String jobStatus) {
        this.jobStatus = jobStatus;
    }


    public JobDataMap getJobDataMap() {
        return jobDataMap;
    }

    public void setJobDataMap(JobDataMap jobDataMap) {
        this.jobDataMap = jobDataMap;
    }

    public String getSite() {
        return site;
    }

    public void setSite(String site) {
        this.site = site;
    }

    @Override
    public String toString() {
        return "JobDTO{" +
                "beanName='" + beanName + '\'' +
                ", cronExpression='" + cronExpression + '\'' +
                ", triggerName='" + triggerName + '\'' +
                ", jobName='" + jobName + '\'' +
                ", jobStatus='" + jobStatus + '\'' +
                ", jobDataMap=" + jobDataMap +
                ", site='" + site + '\'' +
                '}';
    }
}

import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tech.liveeasy.platform.kapok.elt.constant.es.IndexAndTypeConstant;
import tech.liveeasy.platform.kapok.elt.constant.es.PropertyIndexMappingConstant;

import java.util.List;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
 * 更新search_index的count值
 * @author 王
 */
@Component
public class UpdateSearchIndexJob implements Job {

    @Autowired
    private RestHighLevelClient client;

    @Override
    public void execute(JobExecutionContext context) {
        String site = context.getTrigger().getJobDataMap().getString("site");
        String id = null;
        try {
            SearchRequest request = new SearchRequest(IndexAndTypeConstant.PROPERTY_SQL);
            request.types(IndexAndTypeConstant.PROPERTY_SQL_TYPE);

            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.size(0);

            if (StringUtils.isNotBlank(site)) {
                BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
                boolQueryBuilder.must(QueryBuilders.prefixQuery("site", site));
                searchSourceBuilder.query(boolQueryBuilder);
            }
            System.err.println("site=================================" + site);

            TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("estateIdAgg").field(PropertyIndexMappingConstant.ESTATE_ID);
            aggregationBuilder.order(BucketOrder.count(false));
            //todo
            aggregationBuilder.size(Integer.MAX_VALUE);
            searchSourceBuilder.aggregation(aggregationBuilder);

            request.source(searchSourceBuilder);
            SearchResponse searchResponse = client.search(request);
            Map<String, Aggregation> aggMap = searchResponse.getAggregations().getAsMap();
            ParsedStringTerms gradeTerms = (ParsedStringTerms) aggMap.get("estateIdAgg");
            List list = gradeTerms.getBuckets();
            for (Object object : list) {
                ParsedStringTerms.ParsedBucket obj = (ParsedStringTerms.ParsedBucket) object;
                String key = obj.getKeyAsString();
                long count = obj.getDocCount();
                id = key;
                UpdateRequest updateRequest = new UpdateRequest(IndexAndTypeConstant.SEARCH_INDEX,
                        IndexAndTypeConstant.SEARCH_TYPE, key)
                        .doc(jsonBuilder()
                                .startObject()
                                .field("count", count)
                                .endObject());
                client.update(updateRequest);
            }
        } catch (Exception e) {
            System.err.println("本條資料插入失敗 id: " + id);
        }
        System.err.println("UpdateSearchIndexJob執行結束!");
    }
}

import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import tech.liveeasy.platform.kapok.elt.constant.JsonResult;
import tech.liveeasy.platform.kapok.elt.constant.ResultCode;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;

/**
 * @author 
 */
@RestController
@RequestMapping("/scheduler")
public class QuartzManageController {

    @Autowired
    private Scheduler scheduler;

    /**
     * 更新任務
     *
     * @param jobDTO  必選引數 triggerName  cronExpression
     *                可選 site   城市編碼
     * @throws SchedulerException
     */
    @GetMapping({"/update"})
    public JsonResult update(JobDTO jobDTO) throws SchedulerException {
        TriggerKey triggerKey = TriggerKey.triggerKey(jobDTO.getTriggerName());
        CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(jobDTO.getCronExpression());
        String site = jobDTO.getSite();
        trigger = trigger.getTriggerBuilder()
                .withIdentity(triggerKey)
                .usingJobData("site", site)
                .withSchedule(scheduleBuilder)
                .build();
        try {
            scheduler.rescheduleJob(triggerKey, trigger);
        } catch (Exception e) {
            return new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
        }
        return new JsonResult(ResultCode.SUCCESS, "成功", jobDTO);
    }

    /**
     * 刪除job
     *
     * @param jobName
     * @throws SchedulerException
     */
    @GetMapping({"/delete"})
    public JsonResult deleteJob(String jobName) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobName);
        boolean b = scheduler.deleteJob(jobKey);
        return new JsonResult(ResultCode.SUCCESS, "成功", b);
    }

    /**
     *  新加任務
     *
     * @param job
     * @throws SchedulerException
     */
    @GetMapping({"/add"})
    public JsonResult addJob(JobDTO job) throws SchedulerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
        Class<?> clazz = Class.forName(job.getBeanName());
        Job jobEntity = (Job) clazz.newInstance();
        JobDetail jobDetail = newJob(jobEntity.getClass()).withIdentity(job.getJobName()).build();
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
        String site = job.getSite();
        Trigger trigger = newTrigger()
                .withIdentity(job.getTriggerName())
                .usingJobData("site", site)
                .startNow()
                .withSchedule(scheduleBuilder)
                .build();
        try {
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (ObjectAlreadyExistsException e) {
            String result = "Unable to store Job, because one already exists with this identification.";
            return new JsonResult(ResultCode.SYS_ERROR, "異常", result);
        }
        return new JsonResult(ResultCode.SUCCESS, "成功", job);
    }


    /**
     * 檢視計劃中的任務
     */
    @GetMapping({"/plans"})
    public JsonResult planJobs() {
        List<JobDTO> jobList = new ArrayList<>();
        try {
            GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
            Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
            for (JobKey jobKey : jobKeys) {
                List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
                for (Trigger trigger : triggers) {
                    JobDTO job = new JobDTO();
                    job.setJobName(jobKey.getName());
                    job.setTriggerName(trigger.getKey().getName());
                    job.setJobDataMap(trigger.getJobDataMap());
                    Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                    job.setJobStatus(triggerState.name());
                    if (trigger instanceof CronTrigger) {
                        CronTrigger cronTrigger = (CronTrigger) trigger;
                        String cronExpression = cronTrigger.getCronExpression();
                        job.setCronExpression(cronExpression);
                    }
                    jobList.add(job);
                }
            }
        } catch (Exception e) {
            new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
        }
        return new JsonResult(ResultCode.SUCCESS, "成功", jobList);
    }

    /**
     * 檢視執行中的任務
     */
    @GetMapping({"/runs"})
    public JsonResult runJobs() {
        try {
            List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
            List<JobDTO> jobList = new ArrayList<>(executingJobs.size());
            for (JobExecutionContext executingJob : executingJobs) {
                JobDTO job = new JobDTO();
                JobDetail jobDetail = executingJob.getJobDetail();
                JobKey jobKey = jobDetail.getKey();
                Trigger trigger = executingJob.getTrigger();
                job.setJobName(jobKey.getName());
                job.setTriggerName(trigger.getKey().getName());
                job.setJobDataMap(trigger.getJobDataMap());
                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                job.setJobStatus(triggerState.name());
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String cronExpression = cronTrigger.getCronExpression();
                    job.setCronExpression(cronExpression);
                }
                jobList.add(job);
            }
            return new JsonResult(ResultCode.SUCCESS, "成功", jobList);
        } catch (Exception e) {
            new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
        }
        return new JsonResult();
    }


    /**
     * 立即執行job
     *
     * @param jobName
     */
    @GetMapping({"/runNow"})
    public JsonResult runAJobNow(String jobName) {
        JobKey jobKey = JobKey.jobKey(jobName);
        try {
            scheduler.triggerJob(jobKey);
        } catch (Exception e) {
            return new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
        }
        return new JsonResult(ResultCode.SUCCESS, "成功");
    }

    /**
     * 暫停job
     *
     * @param jobName
     * @Description 和恢復job相對
     */
    @GetMapping({"/pause"})
    public JsonResult pauseJob(String jobName) {
        JobKey jobKey = JobKey.jobKey(jobName);
        try {
            scheduler.pauseJob(jobKey);
        } catch (Exception e) {
            return new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
        }
        return new JsonResult(ResultCode.SUCCESS, "成功");
    }

    /**
     * 恢復job
     *
     * @param jobName
     * @Description 和暫停任務相對
     */
    @GetMapping({"/resume"})
    public JsonResult resumeJob(String jobName) {
        JobKey jobKey = JobKey.jobKey(jobName);
        try {
            scheduler.resumeJob(jobKey);
        } catch (Exception e) {
            return new JsonResult(ResultCode.SYS_ERROR, "異常", e.getMessage());
        }
        return new JsonResult(ResultCode.SUCCESS, "成功");
    }
}