1. 程式人生 > >Spring Batch 之 Job的建立和呼叫

Spring Batch 之 Job的建立和呼叫

     在上一篇文章  Spring Batch 之 背景框架簡介   中,已經概述了Batch的基本架構組織,並且運行了簡易demo。 在接下來的篇幅中,將逐步介紹每個元件的使用方式,並結合業務進行批處理。

  一個job是如何誕生的?  由什麼組成的?  Spring Batch 又是如何去呼叫執行?

  首先,我們先來了解Job是如何定義與實現:

        一個job可以由一個或多個step組成,通過JobBuilderFactory例項建立Bean,使用next指向下一個step;

package com.batch.demo.flow.jobFlowDemoOne;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobFlowDemoOne {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job JobFlowDemo1(){
        return jobBuilderFactory.get("jobFlowDemo1")
                .start(step1())
                .next(step2())
                .next(step3())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step 1");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((contribution,context)->{
                    System.out.println("step 2");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    public Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step 3");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
}

  執行該job,我們可以在控制檯和資料庫中看到相對應的job、step資訊:

2018-12-30 16:47:32.517  INFO 4972 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=jobFlowDemo1]] launched with the following parameters: [{}]
2018-12-30 16:47:32.631  INFO 4972 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
step 1
2018-12-30 16:47:32.742  INFO 4972 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
step 2
2018-12-30 16:47:32.786  INFO 4972 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step3]
step 3
2018-12-30 16:47:32.809  INFO 4972 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=jobFlowDemo1]] completed with the following parameters: [{}] and the following status: [COMPLETED]

STATUS 狀態是 COMPLETED時候,就表明該step已經執行完成。

 

     但是,往往在現實業務處理中,我們希望能夠根據 每個step操作返回的不同狀態,進行判定是否進入下一個step,或者進行其他處理流程。

     所以,此處我們可以修改下job的配置,使它變成有狀態判定的:

    @Bean
    public Job JobFlowDemo1(){
        return jobBuilderFactory.get("jobFlowDemo1")
//                .start(step1())
//                .next(step2())
//                .next(step3())
//                .build();
                .start(step1())
                .on("COMPLETED").to(step2())
                .from(step2()).on("COMPLETED").to(step3())
                .from(step3()).end()
                .build();
    }

      當step1 成功執行完成後,返回COMPLETED, 才呼叫step2進行下一步處理。但是過多的step,不易於程式維護和複用,因此後續篇幅會引入 Spring Batch 之 flow 介紹和使用 。

 

      可能有些同學在一步測試,多次啟動專案時候,就會發現只會在第一次啟動時候成功,剩下都報錯,如下:

2018-12-30 16:59:48.504  INFO 7744 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=jobFlowDemo1]] completed with the following parameters: [{}] and the following status: [COMPLETED]

      這是因為,Spring Batch中相同的Job,當所帶引數一致的時候,有且只會啟動一次。 因此我們可以通過修改job名,或者匯入不同引數進行測試。

      那麼Job 的引數又是如何引入的?  別急,這塊結合接下來馬上進入的 jobLauncher、jobOperator 的demo中,給大家介紹。

 

     在成功建立一個job後,Spring Batch 預設在專案啟動時候執行配置的job。往往在正常業務處理中,需要我們手動或者定時去觸發job,所以這邊便引入了jobLauncher、jobOperator兩個執行器。

在上一篇文章  Spring Batch 之 背景框架簡介  的demo中提到  :

      當application.properties 配置 spring.batch.job.enabled = false 時,即可關閉 Batch自動執行job的操作。

如何配置 jobLauncher、jobOperator?  廢話不多說,我們直接看程式碼。

jobLauncher

     此處我們通過web的API介面去呼叫 jobLauncher,通過介面傳入job的引數。呼叫的Job 是根據 在建立job時候,Bean name去指定。

   @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job jobLaunchDemoJob;

    @GetMapping("/{job1param}")
    public String runJob1(@PathVariable("job1param") String job1param) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        System.out.println("Request to run job1 with param: " + job1param);
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("job1param",job1param)
                .toJobParameters();
        jobLauncher.run(jobLaunchDemoJob,jobParameters);
        return "Job1 success.";

    }

接下來我們看 jobOperator的使用:

    @Autowired
    private JobRepository jobRepository;
    
    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    private JobRegistry jobRegistry;

    @Autowired
    private JobLauncher jobLauncher;

    @Bean
    public JobOperator jobOperator(){
        SimpleJobOperator operator = new SimpleJobOperator();

        operator.setJobLauncher(jobLauncher);
        operator.setJobParametersConverter(new DefaultJobParametersConverter());
        operator.setJobRepository(jobRepository);
        operator.setJobExplorer(jobExplorer);
        operator.setJobRegistry(jobRegistry);
        return operator;
    }
    @Autowired
    private JobOperator jobOperator;

    @GetMapping("/{job2param}")
    public String runJob1(@PathVariable("job2param") String job2param) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobInstanceAlreadyExistsException, NoSuchJobException {
        System.out.println("Request to run job2 with param: " + job2param);

        jobOperator.start("jobOperatorDemoJob","job2param="+job2param);

        return "Job2 success.";

    }

最後,定時任務呼叫,熟悉的同學應該已經猜到了:

      通過corn表示式,滿足條件時候,即執行

    @Scheduled(fixedDelay = 5000)
    public void scheduler() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException, JobParametersNotFoundException, NoSuchJobException {
        jobOperator().startNextInstance("jobScheduledDemoJob");
    }

 

補充

   1、通過配置檔案制定資料來源DataSource,Spring Batch 根據sourceTye去指定資料庫型別,執行指令碼。

在多資料來源情況下,預設使用primary。手動指定增加如下配置:

    @Bean
   JobRepository obRepository(PlatformTransactionManager platformTransactionManager){
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDatabaseType(DatabaseType.MYSQL.getProductName());
        //配置指定的dataSource
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(platformTransactionManager);
        return jobRepositoryFactoryBean.getObject();
   }

 

    2、job的引數是在整個job的step的生命週期中都可以使用到,我們可以根據不同業務處理邏輯,傳入所需引數。 

呼叫過程,demo如下:

package com.batch.demo.flow.jobParametersDemo;

import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@Configuration
public class JobParametersDemoConfiguration implements StepExecutionListener{
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    private Map<String, JobParameter> params;

    @Bean
    public Job myJobParametersDemoJob(){
        return jobBuilderFactory.get("myJobParametersDemoJob")
                .start(myJobParametersDemoStep())
                .build();
    }

    @Bean
    public Step myJobParametersDemoStep() {
        return stepBuilderFactory.get("myJobParametersDemoStep")
                .listener(this)
                .tasklet(((contribution, chunkContext) -> {
                    System.out.println("Parameter is : " + params.get("info"));
                    return RepeatStatus.FINISHED;
                })).build();

    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        params =  stepExecution.getJobParameters().getParameters();

    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}

    demo中使用到了 step的監聽器,在後續章節會逐步講解,此處不予細講。