spring boot + spring batch 讀取資料庫檔案
----------------------------------------------------------------------------------------------------------------------------------
一、配置JOB
import javax.sql.DataSource;
import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
/**
* @Description : 處理具體工作業務 主要包含三個部分:讀資料、處理資料、寫資料
*/
@Configuration
@EnableBatchProcessing // 開啟批處理 系統會在啟動時執行,阻止自動執行job需要在配置檔案新增配置
public class BatchConfiguration {
/**
* 作業倉庫
*/
/*@Bean 使用資料庫記錄日誌
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType(DatabaseType.MYSQL.name());
return jobRepositoryFactoryBean.getObject();
}*/
/**
* 作業排程器
*/
/*public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(this.jobRepository(dataSource, transactionManager));
return jobLauncher;
}*/
//使用預設提供方式
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
// tag::readerwriterprocessor[] 1.讀資料
@Bean
public ItemReader<Bean> reader(DataSource dataSource) throws UnexpectedInputException, ParseException, Exception {
//讀資料可以讀檔案、資料庫等,我這隻使用了讀取資料庫的方式
JdbcCursorItemReader<Bean> itemReader = new JdbcCursorItemReader<Bean>();
itemReader.setDataSource(dataSource);itemReader.setSql("select * from xx");
itemReader.setRowMapper(new BeanPropertyRowMapper<Bean>(Bean.class要轉換成的bean));
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
}
itemReader.close();
return itemReader;
}
//2.處理資料
@Bean
public JdbcItemProcessor processor() {
}
//3.寫資料
@Bean
public JdbcItemWriter writer(JdbcTemplate jdbcTemplate) {
//自定義寫資料操作
JdbcItemWriter writer = new JdbcItemWriter(jdbcTemplate);
return writer;
}
// end::readerwriterprocessor[]
// tag::jobstep[] 生成job
@Bean
public Job importJob(JobBuilderFactory jobs, @Qualifier("step1")Step s1, JdbcItemJobListener listener) {
return jobs.get("importJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(s1)
.end()
.build();
}
//執行步驟
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Bean> reader,
JdbcItemWriter jdbcItemWriter, ItemProcessor<Bean, DealBean> processor) {
return stepBuilderFactory.get("step1")
.<Bean, DealBean> chunk(100) //一次處理多少資料
.reader(reader)
.processor(processor)
.writer(jdbcItemWriter)
.build();
}
}
---------------------------------------------------------------------------------------------------------------------------------
二、資料處理:
import org.springframework.batch.item.ItemProcessor
/***處理資料器
*ItemProcessor<Bean, DealBean>
*Bean為傳入資料,DealBean為處理後返回資料
*/
public class JdbcItemProcessor implements ItemProcessor<Bean, DealBean> {
public DealBean process(Bean bean) throws Exception {
// Bean bean 為讀操作資料
DealBean deal = new DealBean();
return deal;
}
}
----------------------------------------------------------------------------------------------------------------------------------
三、寫操作
import java.util.List;
import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.core.JdbcTemplate;
public class JdbcItemWriter implements ItemWriter<DealBean>, InitializingBean{
private JdbcTemplate jdbcTemplate;
public JdbcItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(List<? extends DealBean> dealBeans) {
//備份clear_atm
try {
for( DealBean dealBean : dealBeans ){
//刪除當前表,然後更新
jdbcTemplate.update( SQL語句,傳入引數,,,);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub
}
}
----------------------------------------------------------------------------------------------------------------------------------四、監聽
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.stereotype.Component;
/**
*Job執行監聽器
*/
@Component
public class JdbcItemJobListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB 執行完成!");
}
}
@Override
public void beforeJob(JobExecution jobExecution) {
// TODO Auto-generated method stub
super.beforeJob(jobExecution);
log.info("!!! JOB 執行開始!");
}
--------------------------------------------------------------------------------------------------------------------------------
五、定時啟動
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.RestController;
public class Controller {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job importJob;
public JobParameters jobParameters;
//定時執行
@Scheduled(cron = "0 0 0 10 * ?")
public void execute() throws Exception{
jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters();
jobLauncher.run(importJob, jobParameters);
}
}
注:以上僅為個人理解,如有問題請指教。