Spring Boot 2.0.3整合Spring Batch
阿新 • • 發佈:2018-12-07
Batch用來做大資料處理,是一項不錯的選擇,由於公司的整體架構是Spring Boot,因此自己研究了一下兩者之間的關係。
1.在官網http://start.spring.io/,選擇MYSQL,BATCH,WEB
2.自定義MyBatchConfig類,添加註解@Configuration--配置註解,@EnableBatchProcessing--batch註解,相關程式碼如下:
package com.kmm.config; import com.kmm.bean.Person; import com.kmm.listener.MyJobListener; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.item.*; import org.springframework.batch.support.DatabaseType; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; import java.util.List; @Configuration @EnableBatchProcessing public class MyBatchConfig { @Bean public ItemReader<Person> reader() throws Exception{ ItemReader<Person> itemReader = new ItemReader<Person>() { @Override public Person read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { System.out.println("reader"); return new Person(); } }; return itemReader; } @Bean public ItemProcessor<Person, Person> processor(){ ItemProcessor<Person, Person> processor = new ItemProcessor<Person, Person>() { @Override public Person process(Person person) throws Exception { System.out.println("processor"); return new Person(); } }; return processor; } @Bean public ItemWriter<Person> writer(){ ItemWriter<Person> itemWriter = new ItemWriter<Person>() { @Override public void write(List<? extends Person> list) throws Exception { System.out.println("writer"); } }; return itemWriter; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType(DatabaseType.MYSQL.name()); return jobRepositoryFactoryBean.getObject(); } // @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{ SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(this.jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job myJob(JobBuilderFactory jobs, Step step){ return jobs.get("myJob1") .incrementer(new RunIdIncrementer()) .flow(step) // 為Job指定Step .end() .listener(myJobListener()) // 繫結監聽器 .build(); } @Bean public Step step(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor){ return stepBuilderFactory.get("MyStep") .<Person, Person>chunk(5000) // 批處理每次提交5000條資料 .reader(reader) // 給step繫結reader .processor(processor) // 給step繫結processor .writer(writer) // 給step繫結writer .build(); } @Bean public MyJobListener myJobListener(){ return new MyJobListener(); } }
Batch的流程為,reader,processor,writer;
processor可使用自定義,內容如下:
import com.kmm.bean.Person; import org.springframework.batch.item.ItemProcessor; public class MyProcessor implements ItemProcessor<Person, Person> { @Override public Person process(Person person) throws Exception { return person; } }
reader和writer同理
3.使用Controller來測試該job,內容如下:
import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class DemoController { @Autowired SimpleJobLauncher jobLauncher; @Autowired Job importJob; public JobParameters jobParameters; @RequestMapping("/test") public void imp() throws Exception{ jobParameters = new JobParametersBuilder() .addLong("time",System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(importJob,jobParameters); } }
4.配置mysql資料來源(yaml檔案型別):
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/batch?useSSL=false username: root password: root
5.配置batch(yaml檔案型別)
spring: batch: initialize-schema: always job: enabled: false
spring.batch.initialize-shcema:always 初始化資料庫(boot2和boot1,初始化資料庫有區別)
spring.batch.job.enabled:false 專案啟動時不執行job
6.初始化的資料庫表:
7.執行結果:
8.如果考慮定時執行該job,可加@Scheduled(cron = "0 0/5 * * * ?"),每五分鐘執行一次,程式碼如下
import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class DemoController { @Autowired SimpleJobLauncher jobLauncher; @Autowired Job importJob; public JobParameters jobParameters; @Scheduled(cron = "0 0/5 * * * ?") @RequestMapping("/test") public void imp() throws Exception{ jobParameters = new JobParametersBuilder() .addLong("time",System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(importJob,jobParameters); }
剛開始研究,如果有什麼問題,歡迎大家共同討論