增量同步-spring batch(6)動態引數繫結與增量同步
tags:springbatch
1.引言
上一篇《便捷的資料讀寫-spring batch(5)結合beetlSql進行資料讀寫》中使用Spring Batch
及BeetlSql
,對資料庫讀寫元件進行資料庫同步,實際上是全量同步。全量同步的問題在於每次需要讀取整個表資料,如果表資料量大,則資源耗費大,而且不便於對已有資料的更新。因此,在資料同步過程中,更多的使用增量同步,即通過某些條件,區分新資料進行插入,對有變化的資料進行更新,對不存在的資料進行刪除等(當然,一般都不會對資料進行物理刪除,只做邏輯刪除,因此也就變成了資料更新操作)。
增量更新更多情況需要依據上一次更新後的狀態(如時間、自增ID,資料位置等),下一次更新以上一次更新的狀態為基礎,因此,需要把每一次更新後的狀態以變數引數的方式儲存下來,下一次更新則以此狀態資料為動態引數來使用。Spring Batch
2.開發環境
- JDK: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- 開發IDE: IDEA
- 構建工具Maven: 3.3.9
- 日誌元件logback:1.2.3
- lombok:1.18.6
3.增量同步簡述
增量同步,是相對與全量同步,即每次同步,只需要同步源資料庫變化的部分,這樣提高了資料同步的效率。是當前資料同步的普遍方式。抽取變化的資料,又名CDC
,即Change Data Capture
變化資料捕獲。在《Pentaho Kettle解決方案:使用PDI構建開源ETL解決方案 》
CDC
作了比較詳細說明。此處簡要做一下說明,當前實現增量同步的方式有4種,分別是基於源資料的CDC
,基於觸發器的CDC
,基於快照的CDC
,基於日誌的CDC
。
3.1 基於源資料的CDC
基於源資料的CDC
要求源資料裡有相關的屬性列,利用這些屬性列,可以判斷出哪裡是增量資料,最常見的屬性列有:
-
時間戳 基於時間來標識資料,至少需要一個時間,最好兩個,一個標識建立,一個標識更新時間,所以一般我們設計資料庫時都會新增
sys_create_time
和sys_update_time
作為預設欄位,並且設計為預設當前時間和更新處理。 -
自增序列 使用資料庫表的自增序列欄位(一般是主鍵),來標識新插入的資料。不過現實中用得比較少。
此方法需要有一個臨時表來儲存上一次更新時間或,在實踐中,一般是在獨立的模式下建立此表,儲存資料。下一次更新則比較上一次時間或序列。這是用得比較普遍的方式,本文中的增量同步也是使用此方法。
3.2 基於觸發器的CDC
在資料庫中編寫觸發器,當前資料庫執行INSERT
,UPDATE
,DELETE
等語句時,可以啟用資料庫中的觸發器,然後觸發器可以把這些變更的資料儲存到中間臨時表,然後再從臨時表中獲取這些資料,同步到目標資料庫中。當然,這種方法是入侵性最強的,一般資料庫都不允許向資料庫裡新增觸發器(影響效能)。
3.3 基於快照的CDC
此方法就是一次抽取當前全部資料放到緩衝區,作為快照,下一次同步時從源資料讀取資料,然後和快照做比較,找出變化的資料。簡單來說是就做全表讀取與比較,找出變化的資料。做全表掃描,問題就在於效能,所以一般不會使用這種方式。
3.4 基於日誌的CDC
最高階和最沒有入侵性的方法就是基於日誌的方式,資料庫會把插入、更新、刪除的操作記到日誌中,如Mysql
會有binlog
,增量同步可以讀取日誌檔案,把二進位制檔案轉為可理解的方式,然後再把裡面的操作按照順序重做一遍。但是這種方式只能對同種資料庫有效,對於異構的資料庫就無法實現了。而且實現起來有一定的難度。
3.5 本示例增量同步方法說明
在本示例中,依然是基於test_user
表進行增量同步,表有欄位sys_create_time
和sys_update_time
來標識資料建立和更新時間(當前,若現實情況中只有一個時間,也可以只基於此時間,只是這樣就比較難標識此資料是更新還是插入了)。增量同步流程如下:
說明:
- 每次同步,會先讀取臨時表,獲取上一次同步後資料的時間。
- 若是第一次同步,則全部同步,若不是,則根據時間作為查詢語句的引數。
- 根據時間讀取資料後,把資料插入目標表
- 更新臨時表的資料時間,以便下一次同步。
4.Spring Batch動態引數繫結
根據上面的增量同步流程,關鍵點在於把資料時間儲存到臨時表,在資料讀取時可以作為比較的條件。而這時間引數是動態的,在任務執行時才傳遞進去,在Spring Batch
中,支援動態引數繫結,只需要使用@StepScope
註解即可,結合BeetlSql
,很快就可以實現增量同步。本示例是基於上一篇文章的示例來進一步開發的,可以下載原始碼檢視完整示例。
4.1 沿用原來資料庫配置和多資料來源
- 源資料庫:
mytest
- 目標資料庫:
my_test1
- spring batch資料庫:
my_spring_batch
- 同步的資料表:
test_user
4.2 建立臨時表
使用示例中的sql/initCdcTempTable.sql
,在my_spring_batch
庫中,建立臨時表cdc_temp
,並插入記錄為1
的記錄,標識是同步test_user
表。此處,我們只需要關注last_update_time
和current_update_time
,前者表示上一次同步完後的資料最後時間,後者表示上一次同步後的系統時間。
4.3 新增/修改dao
4.3.1 新增臨時表dao及service類
- 新增類
CdcTempRepository
根據配置,由於cdc_temp
是在my_spring_batch
,而它的讀寫是在dao.local
包中,因此需要新增dao.local
包,然後新增類CdcTempRepository
,如下所示:
@Repository
public interface CdcTempRepository extends BaseMapper<CdcTemp> {
}
複製程式碼
- 新增類
CdcTempService
,用於cdc_temp
表的讀取及資料更新 主要包括兩個函式,一個是根據ID獲取當前的cdc_temp
記錄,以便獲取資料上一次同步的資料最後時間。一個是在同步完成後,更新cdc_temp
的資料。如下:
/**
* 根據id獲取cdc_temp的記錄
* @param id 記錄ID
* @return {@link CdcTemp}
*/
public CdcTemp getCurrentCdcTemp(int id){
return cdcTempRepository.getSQLManager().single(CdcTemp.class,id);
}
/**
* 根據引數更新cdcTemp表的資料
* @param cdcTempId cdcTempId
* @param status job狀態
* @param lastUpdateTime 最後更新時間
*/
public void updateCdcTempAfterJob(int cdcTempId,BatchStatus status,Date lastUpdateTime){
//獲取
CdcTemp cdcTemp = cdcTempRepository.getSQLManager().single(CdcTemp.class,cdcTempId);
cdcTemp.setCurrentUpdateTime(DateUtil.date());
//正常完成則更新資料時間
if( status == BatchStatus.COMPLETED){
cdcTemp.setLastUpdateTime(lastUpdateTime);
}else{
log.info(LogConstants.LOG_TAG+"同步狀態異常:"+ status.toString());
}
//設定同步狀態
cdcTemp.setStatus(status.name());
cdcTempRepository.updateById(cdcTemp);
}
複製程式碼
4.3.2 修改源資料dao
在源資料dao類OriginUserRepository
新增函式getOriginIncreUser
,此函式對應user.md
中的sql
語句。
4.3.3 修改目標資料dao
在目標資料dao類TargetUserRepository
中新增函式selectMaxUpdateTime
,用於查詢同步後資料的最後時間。由於此方法的sql簡單,可以直接使用@Sql
註解,如下所示:
@Sql(value="select max(sys_update_time) from test_user")
Date selectMaxUpdateTime();
複製程式碼
4.4 修改user.md
中的sql
語句。
4.4.1 新增增量讀資料sql
在user.md
中新增增量讀資料的sql語句,如下:
getOriginIncreUser
===
* 查詢user資料
select * from test_user
WHERE 1=1
@if(!isEmpty(lastUpdateTime)){
AND (sys_create_time >= #lastUpdateTime# OR sys_update_time >= #lastUpdateTime#)
@}
複製程式碼
說明:
-
@
開頭是beetl
的語法,可以對變數讀取和邏輯判斷,此處的意思是如果變數lastUpdateTime
不為空,則按此條件進行讀取。 -
lastUpdateTime
變數由呼叫時傳入(Map
) - 具體
beetl
使用語法,可參見官方檔案
4.4.2 編寫增量插入sql語句
對於Mysql
資料庫,有insert into ... on duplicate key update ...
的用法,即可以根據唯一鍵(主鍵或唯一索引),若資料已存在,則更新,不存在,則插入。在user.md
檔案中,新增以下語句:
insertIncreUser
===
* 插入資料
insert into test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user)
values (#id#,#name#,#phone#,#title#,#email#,#gender#,#dateOfBirth#,#sysCreateTime#,#sysCreateUser#,#sysUpdateTime#,#sysUpdateUser#)
ON DUPLICATE KEY UPDATE
id = VALUES(id),name = VALUES(name),phone = VALUES(phone),title = VALUES(title),email = VALUES(email),gender = VALUES(gender),date_of_birth = VALUES(date_of_birth),sys_create_time = VALUES(sys_create_time),sys_create_user = VALUES(sys_create_user),sys_update_time = VALUES(sys_update_time),sys_update_user = VALUES(sys_update_user)
複製程式碼
4.5 編寫Spring Batch的元件
Spring Batch
檔案結構如下:
4.5.1 ItemReader
此處與之前的一致,只需要把getOriginUser
函式改為getOriginIncreUser
即可。
4.5.2 ItemWriter
此處與之前的一致,只需要把sql的ID由user.insertUser
改為user.insertIncreUser
即可。
4.5.3 新增IncrementJobEndListener
由於資料同步完後,最後一步就是要更新臨時表的最後時間資料。如下:
@Slf4j
public class IncrementJobEndListener extends JobExecutionListenerSupport {
@Autowired
private CdcTempService cdcTempService;
@Autowired
private TargetUserRepository targetUserRepository;
@Override
public void afterJob(JobExecution jobExecution) {
BatchStatus status = jobExecution.getStatus();
Date latestDate = targetUserRepository.selectMaxUpdateTime();
cdcTempService.updateCdcTempAfterJob(SyncConstants.CDC_TEMP_ID_USER,status,latestDate);
}
}
複製程式碼
說明:
- 先查詢當前資料庫中資料最後時間(
selectMaxUpdateTime
) - 更新中間表資料
cdc_temp
中的last_update_time
4.5.4 新增任務啟動時引數初始化
在資料同步的第一步,需要先初始化臨時表中的資料最後更新時間,因此在任務啟動前,先要進行任務引數設定,以便於把時間引數傳到任務中,在任務執行時使用。如下:
public JobParameters initJobParam(){
CdcTemp currentCdcTemp = cdcTempService.getCurrentCdcTemp(getCdcTempId());
//若未初始化,則先查詢資料庫中對應的最後時間
if(SyncConstants.STR_STATUS_INIT.equals(currentCdcTemp.getStatus())
|| SyncConstants.STR_STATUS_FAILED.equals(currentCdcTemp.getStatus())){
Date maxUpdateTime = selectMaxUpdateTime();
//若沒有資料,則按初始時間處理
if(Objects.nonNull(maxUpdateTime)){
currentCdcTemp.setLastUpdateTime(maxUpdateTime);
}
}
return JobUtil.makeJobParameters(currentCdcTemp);
}
複製程式碼
4.5.5 組裝完整任務
最後,需要一個IncrementBatchConfig
配置把讀、處理、寫、監聽組裝起來,值得一提的是,在配置讀元件時,由於需要使用動態引數,此處需要新增@StepScope
註解,同時在引數中使用spEL
獲取引數內容,如下所示:
@Bean
@StepScope
public ItemReader incrementItemReader(@Value("#{jobParameters['lastUpdateTime']}") String lastUpdateTime) {
IncrementUserItemReader userItemReader = new IncrementUserItemReader();
//設定引數,當前示例可不設定引數
Map<String,Object> params = CollUtil.newHashMap();
params.put(SyncConstants.STR_LAST_UPDATE_TIME,lastUpdateTime);
userItemReader.setParams(params);
return userItemReader;
}
複製程式碼
4.5.6 測試
參考上一文章的BeetlsqlJobTest
,編寫IncrementJobTest
測試檔案。由於需要測試增量同步,測試流程如下所示:
- 測試前增量新增資料
測試前,源資料表和目標資料表已經有資料,在源資料表中,執行程式碼中的
sql/user-data-new.sql
新增新的使用者。注意,由於sys_create_time
和sys_update_time
定義如下:
`sys_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`sys_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,複製程式碼
從而達到資料插入時自動生成時間,修改時也自動更新時間。
-
執行測試 以單元測試執行
incrementJob
。 -
檢視結果 執行完成後,結果如下:
增量同步後,資料如下:
5.總結
本文先對增量同步做了一個簡單的介紹,列出當前一般使用的增量同步方法,然後使用Spring Batch
和BeetlSql
使用基於時間戳的方式實現增量同步,本示例具有一定的實用性,希望能對做資料同步或相關批處理的開發者有幫助。