1. 程式人生 > >spring boot + spring batch 讀取資料庫檔案

spring boot + spring batch 讀取資料庫檔案

----------------------------------------------------------------------------------------------------------------------------------

一、配置JOB

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;

/**
 * @Description : 處理具體工作業務  主要包含三個部分:讀資料、處理資料、寫資料 
 */
@Configuration  
@EnableBatchProcessing // 開啟批處理  系統會在啟動時執行,阻止自動執行job需要在配置檔案新增配置
public class BatchConfiguration {  

/**
     * 作業倉庫
     */
    /*@Bean  使用資料庫記錄日誌
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDatabaseType(DatabaseType.MYSQL.name());

        return jobRepositoryFactoryBean.getObject();
    }*/


    /**
     * 作業排程器
     */
    /*public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(this.jobRepository(dataSource, transactionManager));
        return jobLauncher;
    }*/
//使用預設提供方式
@Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

   // tag::readerwriterprocessor[] 1.讀資料  
   @Bean  
   public ItemReader<Bean> reader(DataSource dataSource) throws UnexpectedInputException, ParseException, Exception {  

    //讀資料可以讀檔案、資料庫等,我這隻使用了讀取資料庫的方式

        JdbcCursorItemReader<Bean> itemReader = new JdbcCursorItemReader<Bean>();

    itemReader.setDataSource(dataSource);
    itemReader.setSql("select * from  xx");
    itemReader.setRowMapper(new  BeanPropertyRowMapper<Bean>(Bean.class要轉換成的bean));
    ExecutionContext executionContext = new ExecutionContext();
    itemReader.open(executionContext);
    Object customerCredit = new Object();
    while(customerCredit != null){
        customerCredit = itemReader.read();
    }
    itemReader.close();
    return itemReader;
    }  
  
    //2.處理資料  
    @Bean  

    public JdbcItemProcessor processor() {  

        return new JdbcItemProcessor();  
    }
    
    //3.寫資料
    @Bean
    public JdbcItemWriter writer(JdbcTemplate jdbcTemplate) {
    //自定義寫資料操作
    JdbcItemWriter writer = new JdbcItemWriter(jdbcTemplate);
    return writer;
    }
    
    // end::readerwriterprocessor[]  
  
    // tag::jobstep[] 生成job
    @Bean  
    public Job importJob(JobBuilderFactory jobs, @Qualifier("step1")Step s1, JdbcItemJobListener listener) {  
        return jobs.get("importJob")  
                .incrementer(new RunIdIncrementer())  
                .listener(listener)  
                .flow(s1)  
                .end()  
                .build();
    }
    
    //執行步驟
    @Bean  
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Bean> reader,  
    JdbcItemWriter jdbcItemWriter, ItemProcessor<Bean, DealBean> processor) {  
        return stepBuilderFactory.get("step1")  
                .<Bean, DealBean> chunk(100)   //一次處理多少資料
                .reader(reader)  
                .processor(processor)  
                .writer(jdbcItemWriter)  
                .build();  
    }

}  

---------------------------------------------------------------------------------------------------------------------------------

二、資料處理:

import org.springframework.batch.item.ItemProcessor

/**
 *處理資料器
 *ItemProcessor<Bean, DealBean>
 *Bean為傳入資料,DealBean為處理後返回資料
 */
public class JdbcItemProcessor implements ItemProcessor<Bean, DealBean> {

public DealBean process(Bean bean) throws Exception {
// Bean bean 為讀操作資料
DealBean deal = new DealBean();

return deal;

}
}

----------------------------------------------------------------------------------------------------------------------------------

三、寫操作

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.core.JdbcTemplate;

public class JdbcItemWriter implements ItemWriter<DealBean>, InitializingBean{

private JdbcTemplate jdbcTemplate;

public JdbcItemWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}


@Override
public void write(List<? extends DealBean> dealBeans) {

//備份clear_atm

try {
for( DealBean dealBean : dealBeans ){
//刪除當前表,然後更新
jdbcTemplate.update( SQL語句,傳入引數,,,);
        }
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub

}

}

----------------------------------------------------------------------------------------------------------------------------------四、監聽

import org.springframework.batch.core.BatchStatus;  
import org.springframework.batch.core.JobExecution;  
import org.springframework.batch.core.listener.JobExecutionListenerSupport;  
import org.springframework.stereotype.Component;  


/**
 *Job執行監聽器 
 */
@Component  
public class JdbcItemJobListener extends JobExecutionListenerSupport {  
     
  
    @Override  
    public void afterJob(JobExecution jobExecution) {  
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {  
            log.info("!!! JOB 執行完成!");  
        }  
    }  
  
    @Override  
    public void beforeJob(JobExecution jobExecution) {  
        // TODO Auto-generated method stub  
        super.beforeJob(jobExecution);  
        log.info("!!! JOB 執行開始!");  
    }  

   --------------------------------------------------------------------------------------------------------------------------------

五、定時啟動

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.RestController;

public class Controller {

    @Autowired
    JobLauncher jobLauncher;
    @Autowired
    Job importJob;
    public JobParameters jobParameters;

    //定時執行
    @Scheduled(cron = "0 0 0 10 * ?")
    public void execute() throws Exception{
        jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters();
        jobLauncher.run(importJob, jobParameters);
    }

}

注:以上僅為個人理解,如有問題請指教。