1. 程式人生 > >Elastic-Job專案原始碼分析1--核心骨架JobScheduler

Elastic-Job專案原始碼分析1--核心骨架JobScheduler

簡介

我覺得,還是原文寫的好,大家可以去看看:

說下我喜歡的原因吧

  • 可以和現有的工程對接
  • 支援任務分片,這個太重要了!!
  • 彈性,水平擴充套件很好;如果有機器宕機,由其他機器執行
  • 視覺化的、web管理平臺
  • 程式碼架構不錯,雖然各種各樣的service,看起來很令人頭疼

Elastic-job程式入口 –JobScheduler

無論開發還是除錯程式碼,都必須找到程式的入口,否則就是沒有頭的蒼蠅,不知道到來龍去脈
elastic-job-example\elastic-job-example-lite-java\src\main\java\com\dangdang\ddframe\job\example\JavaMain.java

    public static void main(final String[] args) throws IOException {
    // CHECKSTYLE:ON
        EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
        CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
        JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(setUpEventTraceDataSource());
        setUpSimpleJob(regCenter, jobEventConfig);
//        setUpDataflowJob(regCenter, jobEventConfig);
// setUpScriptJob(regCenter, jobEventConfig); }

我們看到這裡啟動了三種Job,分別是SimpleJob、DataflowJob、ScriptJob,這三種job本質上是不同的,其內部實現採用了3中不同的執行器(稍後講解)。

我們先看下setUpSimpleJob

   private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
        JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob"
, "0 5 * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName()); new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig, new JavaSimpleListener(), new JavaSimpleDistributeListener(1000L, 2000L)).init(); }

就幾行程式碼,重點來了,JobScheduler這是一個很重要的類!!!裡面建立了很多服務 同時是任務的啟動源頭。在看程式碼時,大家可能看的有點懵逼,建議畫個結構圖,很簡單,就是把核心的類、方法抽取出來,如下圖

jobscheduler-1

幾大服務建立好了,就需要啟動它,就是JobScheduler.init 方法了

jobscheduler-2

這裡是列舉除了主要模組服務。
之前不是說elastic-job是基於quartz之上嘛,那怎麼沒看到quartz?客官稍等就來。
來,我們看看JobScheduler.init

    /**
     * 初始化作業.
     */
    public void init() {
        jobExecutor.init();
        JobTypeConfiguration jobTypeConfig = jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig();
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName);
        jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron());
        jobRegistry.addJobScheduleController(jobName, jobScheduleController);
    }

看到createScheduler

private JobDetail createJobDetail(final String jobClass) {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(jobName).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }

    protected Optional<ElasticJob> createElasticJobInstance() {
        return Optional.absent();
    }

    private Scheduler createScheduler(final boolean isMisfire) {
        Scheduler result;
        try {
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties(isMisfire));
            result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener());
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }

    private Properties getBaseQuartzProperties(final boolean isMisfire) {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        result.put("org.quartz.threadPool.threadCount", "1");
        result.put("org.quartz.scheduler.instanceName", jobName);
        if (!isMisfire) {
            result.put("org.quartz.jobStore.misfireThreshold", "1");
        }
        result.put("org.quartz.plugin.shutdownhook.class", ShutdownHookPlugin.class.getName());
        result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
        return result;
    }

看到result.put("org.quartz.threadPool.threadCount", "1"); 這到這個沒,不得不讚亮哥的想法

這裡還有一個亮點就是createJobDetail ,用LiteJob代理了目標類,因為他門的cron是一樣的。我們看下這裡的LiteJob

    /**
     * Lite排程作業.
     * 
     * @author zhangliang
     */
    public static final class LiteJob implements Job {

        @Setter
        private ElasticJob elasticJob;

        @Setter
        private JobFacade jobFacade;

        @Override
        public void execute(final JobExecutionContext context) throws JobExecutionException {
            JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
        }
    }

簡短的令人髮指!!!

看到JobExecutorFactory,就會知道上面三種不同的job,其實是對應了不同的Executor。

好了今天就到此