Spring Batch 之 Job的建立和呼叫
在上一篇文章 Spring Batch 之 背景框架簡介 中,已經概述了Batch的基本架構組織,並且運行了簡易demo。 在接下來的篇幅中,將逐步介紹每個元件的使用方式,並結合業務進行批處理。
一個job是如何誕生的? 由什麼組成的? Spring Batch 又是如何去呼叫執行?
首先,我們先來了解Job是如何定義與實現:
一個job可以由一個或多個step組成,通過JobBuilderFactory例項建立Bean,使用next指向下一個step;
package com.batch.demo.flow.jobFlowDemoOne; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class JobFlowDemoOne { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job JobFlowDemo1(){ return jobBuilderFactory.get("jobFlowDemo1") .start(step1()) .next(step2()) .next(step3()) .build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("step 1"); return RepeatStatus.FINISHED; } }).build(); } @Bean public Step step2() { return stepBuilderFactory.get("step2") .tasklet((contribution,context)->{ System.out.println("step 2"); return RepeatStatus.FINISHED; }).build(); } @Bean public Step step3() { return stepBuilderFactory.get("step3") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("step 3"); return RepeatStatus.FINISHED; } }).build(); } }
執行該job,我們可以在控制檯和資料庫中看到相對應的job、step資訊:
2018-12-30 16:47:32.517 INFO 4972 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=jobFlowDemo1]] launched with the following parameters: [{}] 2018-12-30 16:47:32.631 INFO 4972 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1] step 1 2018-12-30 16:47:32.742 INFO 4972 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2] step 2 2018-12-30 16:47:32.786 INFO 4972 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step3] step 3 2018-12-30 16:47:32.809 INFO 4972 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=jobFlowDemo1]] completed with the following parameters: [{}] and the following status: [COMPLETED]
STATUS 狀態是 COMPLETED時候,就表明該step已經執行完成。
但是,往往在現實業務處理中,我們希望能夠根據 每個step操作返回的不同狀態,進行判定是否進入下一個step,或者進行其他處理流程。
所以,此處我們可以修改下job的配置,使它變成有狀態判定的:
@Bean
public Job JobFlowDemo1(){
return jobBuilderFactory.get("jobFlowDemo1")
// .start(step1())
// .next(step2())
// .next(step3())
// .build();
.start(step1())
.on("COMPLETED").to(step2())
.from(step2()).on("COMPLETED").to(step3())
.from(step3()).end()
.build();
}
當step1 成功執行完成後,返回COMPLETED, 才呼叫step2進行下一步處理。但是過多的step,不易於程式維護和複用,因此後續篇幅會引入 Spring Batch 之 flow 介紹和使用 。
可能有些同學在一步測試,多次啟動專案時候,就會發現只會在第一次啟動時候成功,剩下都報錯,如下:
2018-12-30 16:59:48.504 INFO 7744 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=jobFlowDemo1]] completed with the following parameters: [{}] and the following status: [COMPLETED]
這是因為,Spring Batch中相同的Job,當所帶引數一致的時候,有且只會啟動一次。 因此我們可以通過修改job名,或者匯入不同引數進行測試。
那麼Job 的引數又是如何引入的? 別急,這塊結合接下來馬上進入的 jobLauncher、jobOperator 的demo中,給大家介紹。
在成功建立一個job後,Spring Batch 預設在專案啟動時候執行配置的job。往往在正常業務處理中,需要我們手動或者定時去觸發job,所以這邊便引入了jobLauncher、jobOperator兩個執行器。
在上一篇文章 Spring Batch 之 背景框架簡介 的demo中提到 :
當application.properties 配置 spring.batch.job.enabled = false 時,即可關閉 Batch自動執行job的操作。
如何配置 jobLauncher、jobOperator? 廢話不多說,我們直接看程式碼。
jobLauncher
此處我們通過web的API介面去呼叫 jobLauncher,通過介面傳入job的引數。呼叫的Job 是根據 在建立job時候,Bean name去指定。
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job jobLaunchDemoJob;
@GetMapping("/{job1param}")
public String runJob1(@PathVariable("job1param") String job1param) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
System.out.println("Request to run job1 with param: " + job1param);
JobParameters jobParameters = new JobParametersBuilder()
.addString("job1param",job1param)
.toJobParameters();
jobLauncher.run(jobLaunchDemoJob,jobParameters);
return "Job1 success.";
}
接下來我們看 jobOperator的使用:
@Autowired
private JobRepository jobRepository;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private JobRegistry jobRegistry;
@Autowired
private JobLauncher jobLauncher;
@Bean
public JobOperator jobOperator(){
SimpleJobOperator operator = new SimpleJobOperator();
operator.setJobLauncher(jobLauncher);
operator.setJobParametersConverter(new DefaultJobParametersConverter());
operator.setJobRepository(jobRepository);
operator.setJobExplorer(jobExplorer);
operator.setJobRegistry(jobRegistry);
return operator;
}
@Autowired
private JobOperator jobOperator;
@GetMapping("/{job2param}")
public String runJob1(@PathVariable("job2param") String job2param) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobInstanceAlreadyExistsException, NoSuchJobException {
System.out.println("Request to run job2 with param: " + job2param);
jobOperator.start("jobOperatorDemoJob","job2param="+job2param);
return "Job2 success.";
}
最後,定時任務呼叫,熟悉的同學應該已經猜到了:
通過corn表示式,滿足條件時候,即執行
@Scheduled(fixedDelay = 5000)
public void scheduler() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException, JobParametersNotFoundException, NoSuchJobException {
jobOperator().startNextInstance("jobScheduledDemoJob");
}
補充:
1、通過配置檔案制定資料來源DataSource,Spring Batch 根據sourceTye去指定資料庫型別,執行指令碼。
在多資料來源情況下,預設使用primary。手動指定增加如下配置:
@Bean
JobRepository obRepository(PlatformTransactionManager platformTransactionManager){
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDatabaseType(DatabaseType.MYSQL.getProductName());
//配置指定的dataSource
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(platformTransactionManager);
return jobRepositoryFactoryBean.getObject();
}
2、job的引數是在整個job的step的生命週期中都可以使用到,我們可以根據不同業務處理邏輯,傳入所需引數。
呼叫過程,demo如下:
package com.batch.demo.flow.jobParametersDemo;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
@Configuration
public class JobParametersDemoConfiguration implements StepExecutionListener{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private Map<String, JobParameter> params;
@Bean
public Job myJobParametersDemoJob(){
return jobBuilderFactory.get("myJobParametersDemoJob")
.start(myJobParametersDemoStep())
.build();
}
@Bean
public Step myJobParametersDemoStep() {
return stepBuilderFactory.get("myJobParametersDemoStep")
.listener(this)
.tasklet(((contribution, chunkContext) -> {
System.out.println("Parameter is : " + params.get("info"));
return RepeatStatus.FINISHED;
})).build();
}
@Override
public void beforeStep(StepExecution stepExecution) {
params = stepExecution.getJobParameters().getParameters();
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
demo中使用到了 step的監聽器,在後續章節會逐步講解,此處不予細講。