決戰資料庫-spring batch(4)資料庫到資料庫
tags:springbatch
1.引言
上一篇文章《快速使用元件-spring batch(3)讀檔案資料到資料庫》對Spring Batch
的讀、處理、寫元件進行了介紹,並且以實際案例使用了FlatFileItemReader
讀文字檔案,並把每行資料對映為實體,然後使用JdbcBatchItemWriter
把實體物件資料儲存到MySQL
中。但在資料整合的實際應用中,更多的工作是從A資料庫到B資料庫,資料庫之間是異構的,資料與資料的欄位定義是不盡相同的,因此,在資料同步、資料抽取時,需要從源資料庫讀取資料,經過校驗、轉換,過濾、清洗,然後把資料再寫到目標資料庫。本文將在上一篇的基礎上,實現資料庫到資料庫的資料同步。簡單來說,只需要把從檔案讀資料改為從資料庫讀即可。可下載完整
2.開發環境
- JDK: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- 開發IDE: IDEA
- 構建工具Maven: 3.3.9
- 日誌元件logback:1.2.3
- lombok:1.18.6
3.開發流程
上一篇文章中,已經把User
資料儲存在mytest
資料庫中,本文將以mytest
資料庫中的test_user
表為源資料,使用Spring Batch
把資料同步到目標資料庫my_test1
,實現MySQL
到MySQL
的同步。注,若是不同的資料庫,在配置多資料來源時可更改資料庫驅動和連線資訊即可。關鍵程式碼如下所示:
3.1 建立目標資料庫
在MySQL
中建立my_test1
資料庫作為目標資料庫,執行示例工程中的sql/my_test1.sql
建立使用者表,為簡單起見,目標資料表與源資料表資料表結構一樣。建立後如下:
3.2 配置多資料來源
至此,我們的程式涉及三個資料庫,分別是:
- 用於
Spring Batch
資料儲存的my_spring_batch
為 - 源資料庫
mytest
- 目標資料庫
my_test1
因此,需要先配置多資料來源,配置方法跟之前一樣,配置properties
檔案的資料庫連線資訊和使用註解進行配置即可。如下:
application.properties
# spring batch db
spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.username=root
spring.datasource.password=111111
# origin db
spring.origin-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.origin-datasource.username=root
spring.origin-datasource.password=111111
# target db
spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/my_test1?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.target-datasource.username=root
spring.target-datasource.password=111111
複製程式碼
然後使用註解注入多資料來源,如下:
DataSourceConfig.java
@Bean("datasource")
@ConfigurationProperties(prefix="spring.datasource")
@Primary
public DataSource batchDatasource() {
return DataSourceBuilder.create().build();
}
@Bean("originDatasource")
@ConfigurationProperties(prefix="spring.origin-datasource")
public DataSource originDatasource() {
return DataSourceBuilder.create().build();
}
@Bean("targetDatasource")
@ConfigurationProperties(prefix="spring.target-datasource")
public DataSource targetDatasource() {
return DataSourceBuilder.create().build();
}
複製程式碼
3.3 新增讀資料元件JdbcCursorItemReader
從資料庫中讀取資料,Spring Batch
提供了元件JdbcCursorItemReader
,通過它,可以把資料庫的資料讀取出來,然後對映為實體User
,以供後續開發。建立方法如下:
@Bean
public ItemReader db2DbItemReader(@Qualifier("originDatasource") DataSource originDatasource) {
String readSql = " select * from test_user";
return new JdbcCursorItemReaderBuilder<User>()
.dataSource(originDatasource).sql(readSql)
.verifyCursorPosition(false).rowMapper(new UserRowMapper())
.build();
}
複製程式碼
說明:
- 使用
@Qualifier("originDatasource")
標識源資料庫 -
JdbcCursorItemReaderBuilder
用於構建JdbcCursorItemReader
- 讀資料的
sql
語句根據實際情況編寫即可,此處是讀取整個表資料。 - 需要把資料庫對映為實體
User
,使用UserRowMapper
,此mapper
實現RowMapper
介面,把從資料庫讀取的ResultSet
對映為User
的欄位。
3.4 自定義處理元件Db2DbItemProcessor
讀取到資料後,當前的處理是針對title
欄位,不為null
的則轉為大寫即可。如下:
if(Objects.nonNull(title)){
user.setTitle(title.toUpperCase());
}
複製程式碼
3.5 新增寫資料元件JdbcBatchItemWriter
寫入資料,同樣使用JdbcBatchItemWriter
組合,編寫插入sql
語句,把實體User
資料插入到資料庫即可,如下:
@Bean
public ItemWriter db2DbWriter(@Qualifier("targetDatasource") DataSource targetDatasource) {
String inserSql ="INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
"VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)";
return new JdbcBatchItemWriterBuilder<User>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql(inserSql)
.dataSource(targetDatasource)
.build();
}
複製程式碼
說明:
- 使用
JdbcBatchItemWriterBuilder
進行JdbcBatchItemWriter
的建立,設定插入資料庫的sql語句,同時指定資料來源即可。 -
@Qualifier("targetDatasource") DataSource datasource
用於指定資料來源 - 使用
BeanPropertyItemSqlParameterSourceProvider
可以直接把讀取的資料實體的屬性資料作為引數填充到sql
語句中,從而實現資料插入操作。
3.6 組裝完整任務
經過上面的操作,可以使用一個java配置,把讀、寫、處理組裝成完整的step
和job
,如下所示(詳細可見示例工程檔案):
@Bean
public Job db2DbJob(Step db2DbStep,JobExecutionListener db2DbListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return jobBuilderFactory.get(funcName)
.listener(db2DbListener)
.flow(db2DbStep)
.end().build();
}
@Bean
public Step db2DbStep(ItemReader db2DbItemReader,ItemProcessor db2DbProcessor,ItemWriter db2DbWriter){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return stepBuilderFactory.get(funcName)
.<User,User>chunk(10)
.reader(db2DbItemReader)
.processor(db2DbProcessor)
.writer(db2DbWriter)
.build();
}
複製程式碼
3.7 測試
參考上一文章的File2DbJobTest
,編寫Db2DbJobTest
檔案即可。如下:
@Test
public void testDb2DbJob() throws JobParametersInvalidException,JobExecutionAlreadyRunningException,JobRestartException,JobInstanceAlreadyCompleteException {
//構建job引數
JobParameters jobParameters = JobUtil.makeJobParameters();
//執行job
Map<String,Object> stringObjectMap = jobLauncherService.startJob(db2DbJob,jobParameters);
//測試結果
Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
}
複製程式碼
經過此測試,可檢視到源資料庫mytest
中的test_user
表中的資料,已全部同步到目標庫my_test1
中的test_user
中。完成資料庫到資料庫的資料同步。
4.總結
本文通過簡單的示例,從源資料庫中讀取表資料,經過處理,寫入到目標資料庫,具體一定的通用性。希望讓大家更深入的瞭解Spring Batch,並能用到實踐中。