1. 程式人生 > 程式設計 >決戰資料庫-spring batch(4)資料庫到資料庫

決戰資料庫-spring batch(4)資料庫到資料庫

tags:springbatch


1.引言

上一篇文章《快速使用元件-spring batch(3)讀檔案資料到資料庫》Spring Batch的讀、處理、寫元件進行了介紹,並且以實際案例使用了FlatFileItemReader讀文字檔案,並把每行資料對映為實體,然後使用JdbcBatchItemWriter把實體物件資料儲存到MySQL中。但在資料整合的實際應用中,更多的工作是從A資料庫到B資料庫,資料庫之間是異構的,資料與資料的欄位定義是不盡相同的,因此,在資料同步、資料抽取時,需要從源資料庫讀取資料,經過校驗、轉換,過濾、清洗,然後把資料再寫到目標資料庫。本文將在上一篇的基礎上,實現資料庫到資料庫的資料同步。簡單來說,只需要把從檔案讀資料改為從資料庫讀即可。可下載完整

示例工程程式碼參考

資料庫到資料庫

2.開發環境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 開發IDE: IDEA
  • 構建工具Maven: 3.3.9
  • 日誌元件logback:1.2.3
  • lombok:1.18.6

3.開發流程

上一篇文章中,已經把User資料儲存在mytest資料庫中,本文將以mytest資料庫中的test_user表為源資料,使用Spring Batch把資料同步到目標資料庫my_test1,實現MySQLMySQL的同步。注,若是不同的資料庫,在配置多資料來源時可更改資料庫驅動和連線資訊即可。關鍵程式碼如下所示:

關鍵程式碼

3.1 建立目標資料庫

MySQL中建立my_test1資料庫作為目標資料庫,執行示例工程中的sql/my_test1.sql建立使用者表,為簡單起見,目標資料表與源資料表資料表結構一樣。建立後如下:

目標資料庫

3.2 配置多資料來源

至此,我們的程式涉及三個資料庫,分別是:

  • 用於Spring Batch資料儲存的my_spring_batch
  • 源資料庫mytest
  • 目標資料庫my_test1

因此,需要先配置多資料來源,配置方法跟之前一樣,配置properties檔案的資料庫連線資訊和使用註解進行配置即可。如下:

application.properties

# spring batch db
spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false spring.datasource.username=root spring.datasource.password=111111 # origin db spring.origin-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false spring.origin-datasource.username=root spring.origin-datasource.password=111111 # target db spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/my_test1?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false spring.target-datasource.username=root spring.target-datasource.password=111111 複製程式碼

然後使用註解注入多資料來源,如下:

DataSourceConfig.java

@Bean("datasource")
@ConfigurationProperties(prefix="spring.datasource")
@Primary
public DataSource batchDatasource() {
    return DataSourceBuilder.create().build();
}

@Bean("originDatasource")
@ConfigurationProperties(prefix="spring.origin-datasource")
public DataSource originDatasource() {
    return DataSourceBuilder.create().build();
}

@Bean("targetDatasource")
@ConfigurationProperties(prefix="spring.target-datasource")
public DataSource targetDatasource() {
    return DataSourceBuilder.create().build();
}
複製程式碼

3.3 新增讀資料元件JdbcCursorItemReader

從資料庫中讀取資料,Spring Batch提供了元件JdbcCursorItemReader,通過它,可以把資料庫的資料讀取出來,然後對映為實體User,以供後續開發。建立方法如下:

@Bean
public ItemReader db2DbItemReader(@Qualifier("originDatasource") DataSource originDatasource) {
    String readSql = " select * from test_user";
    return new JdbcCursorItemReaderBuilder<User>()
            .dataSource(originDatasource).sql(readSql)
            .verifyCursorPosition(false).rowMapper(new UserRowMapper())
            .build();
}
複製程式碼

說明:

  • 使用@Qualifier("originDatasource")標識源資料庫
  • JdbcCursorItemReaderBuilder用於構建JdbcCursorItemReader
  • 讀資料的sql語句根據實際情況編寫即可,此處是讀取整個表資料。
  • 需要把資料庫對映為實體User,使用UserRowMapper,此mapper實現RowMapper介面,把從資料庫讀取的ResultSet對映為User的欄位。

3.4 自定義處理元件Db2DbItemProcessor

讀取到資料後,當前的處理是針對title欄位,不為null的則轉為大寫即可。如下:

if(Objects.nonNull(title)){
    user.setTitle(title.toUpperCase());
}
複製程式碼

3.5 新增寫資料元件JdbcBatchItemWriter

寫入資料,同樣使用JdbcBatchItemWriter組合,編寫插入sql語句,把實體User資料插入到資料庫即可,如下:

@Bean
public ItemWriter db2DbWriter(@Qualifier("targetDatasource") DataSource targetDatasource) {
    String inserSql ="INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
            "VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)";
    return new JdbcBatchItemWriterBuilder<User>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql(inserSql)
            .dataSource(targetDatasource)
            .build();
}
複製程式碼

說明:

  • 使用JdbcBatchItemWriterBuilder進行JdbcBatchItemWriter的建立,設定插入資料庫的sql語句,同時指定資料來源即可。
  • @Qualifier("targetDatasource") DataSource datasource用於指定資料來源
  • 使用BeanPropertyItemSqlParameterSourceProvider可以直接把讀取的資料實體的屬性資料作為引數填充到sql語句中,從而實現資料插入操作。

3.6 組裝完整任務

經過上面的操作,可以使用一個java配置,把讀、寫、處理組裝成完整的stepjob,如下所示(詳細可見示例工程檔案):

@Bean
public Job db2DbJob(Step db2DbStep,JobExecutionListener db2DbListener){
    String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
    return jobBuilderFactory.get(funcName)
            .listener(db2DbListener)
            .flow(db2DbStep)
            .end().build();
}
@Bean
public Step db2DbStep(ItemReader db2DbItemReader,ItemProcessor db2DbProcessor,ItemWriter db2DbWriter){
    String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
    return stepBuilderFactory.get(funcName)
            .<User,User>chunk(10)
            .reader(db2DbItemReader)
            .processor(db2DbProcessor)
            .writer(db2DbWriter)
            .build();
}
複製程式碼

3.7 測試

參考上一文章的File2DbJobTest,編寫Db2DbJobTest檔案即可。如下:

@Test
public void testDb2DbJob() throws JobParametersInvalidException,JobExecutionAlreadyRunningException,JobRestartException,JobInstanceAlreadyCompleteException {
    //構建job引數
    JobParameters jobParameters = JobUtil.makeJobParameters();
    //執行job
    Map<String,Object> stringObjectMap = jobLauncherService.startJob(db2DbJob,jobParameters);
    //測試結果
    Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
}
複製程式碼

經過此測試,可檢視到源資料庫mytest中的test_user表中的資料,已全部同步到目標庫my_test1中的test_user中。完成資料庫到資料庫的資料同步。

4.總結

本文通過簡單的示例,從源資料庫中讀取表資料,經過處理,寫入到目標資料庫,具體一定的通用性。希望讓大家更深入的瞭解Spring Batch,並能用到實踐中。