1. 程式人生 > 程式設計 >增量同步-spring batch(6)動態引數繫結與增量同步

增量同步-spring batch(6)動態引數繫結與增量同步

tags:springbatch


1.引言

上一篇《便捷的資料讀寫-spring batch(5)結合beetlSql進行資料讀寫》中使用Spring BatchBeetlSql,對資料庫讀寫元件進行資料庫同步,實際上是全量同步。全量同步的問題在於每次需要讀取整個表資料,如果表資料量大,則資源耗費大,而且不便於對已有資料的更新。因此,在資料同步過程中,更多的使用增量同步,即通過某些條件,區分新資料進行插入,對有變化的資料進行更新,對不存在的資料進行刪除等(當然,一般都不會對資料進行物理刪除,只做邏輯刪除,因此也就變成了資料更新操作)。

增量更新更多情況需要依據上一次更新後的狀態(如時間、自增ID,資料位置等),下一次更新以上一次更新的狀態為基礎,因此,需要把每一次更新後的狀態以變數引數的方式儲存下來,下一次更新則以此狀態資料為動態引數來使用。Spring Batch

支援任務執行時的動態引數,結合此特性,可以實現資料的增量同步。

2.開發環境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 開發IDE: IDEA
  • 構建工具Maven: 3.3.9
  • 日誌元件logback:1.2.3
  • lombok:1.18.6

3.增量同步簡述

增量同步,是相對與全量同步,即每次同步,只需要同步源資料庫變化的部分,這樣提高了資料同步的效率。是當前資料同步的普遍方式。抽取變化的資料,又名CDC,即Change Data Capture變化資料捕獲。在《Pentaho Kettle解決方案:使用PDI構建開源ETL解決方案 》

一書中,對CDC作了比較詳細說明。此處簡要做一下說明,當前實現增量同步的方式有4種,分別是基於源資料的CDC,基於觸發器的CDC,基於快照的CDC,基於日誌的CDC

3.1 基於源資料的CDC

基於源資料的CDC要求源資料裡有相關的屬性列,利用這些屬性列,可以判斷出哪裡是增量資料,最常見的屬性列有:

  • 時間戳 基於時間來標識資料,至少需要一個時間,最好兩個,一個標識建立,一個標識更新時間,所以一般我們設計資料庫時都會新增sys_create_timesys_update_time作為預設欄位,並且設計為預設當前時間和更新處理。

  • 自增序列 使用資料庫表的自增序列欄位(一般是主鍵),來標識新插入的資料。不過現實中用得比較少。

此方法需要有一個臨時表來儲存上一次更新時間或,在實踐中,一般是在獨立的模式下建立此表,儲存資料。下一次更新則比較上一次時間或序列。這是用得比較普遍的方式,本文中的增量同步也是使用此方法。

3.2 基於觸發器的CDC

在資料庫中編寫觸發器,當前資料庫執行INSERTUPDATEDELETE等語句時,可以啟用資料庫中的觸發器,然後觸發器可以把這些變更的資料儲存到中間臨時表,然後再從臨時表中獲取這些資料,同步到目標資料庫中。當然,這種方法是入侵性最強的,一般資料庫都不允許向資料庫裡新增觸發器(影響效能)。

3.3 基於快照的CDC

此方法就是一次抽取當前全部資料放到緩衝區,作為快照,下一次同步時從源資料讀取資料,然後和快照做比較,找出變化的資料。簡單來說是就做全表讀取與比較,找出變化的資料。做全表掃描,問題就在於效能,所以一般不會使用這種方式。

3.4 基於日誌的CDC

最高階和最沒有入侵性的方法就是基於日誌的方式,資料庫會把插入、更新、刪除的操作記到日誌中,如Mysql會有binlog,增量同步可以讀取日誌檔案,把二進位制檔案轉為可理解的方式,然後再把裡面的操作按照順序重做一遍。但是這種方式只能對同種資料庫有效,對於異構的資料庫就無法實現了。而且實現起來有一定的難度。

3.5 本示例增量同步方法說明

在本示例中,依然是基於test_user表進行增量同步,表有欄位sys_create_timesys_update_time來標識資料建立和更新時間(當前,若現實情況中只有一個時間,也可以只基於此時間,只是這樣就比較難標識此資料是更新還是插入了)。增量同步流程如下:

流程

說明:

  • 每次同步,會先讀取臨時表,獲取上一次同步後資料的時間。
  • 若是第一次同步,則全部同步,若不是,則根據時間作為查詢語句的引數。
  • 根據時間讀取資料後,把資料插入目標表
  • 更新臨時表的資料時間,以便下一次同步。

4.Spring Batch動態引數繫結

根據上面的增量同步流程,關鍵點在於把資料時間儲存到臨時表,在資料讀取時可以作為比較的條件。而這時間引數是動態的,在任務執行時才傳遞進去,在Spring Batch中,支援動態引數繫結,只需要使用@StepScope註解即可,結合BeetlSql,很快就可以實現增量同步。本示例是基於上一篇文章的示例來進一步開發的,可以下載原始碼檢視完整示例。

4.1 沿用原來資料庫配置和多資料來源

  • 源資料庫: mytest
  • 目標資料庫: my_test1
  • spring batch資料庫: my_spring_batch
  • 同步的資料表:test_user

4.2 建立臨時表

使用示例中的sql/initCdcTempTable.sql,在my_spring_batch庫中,建立臨時表cdc_temp,並插入記錄為1的記錄,標識是同步test_user表。此處,我們只需要關注last_update_timecurrent_update_time,前者表示上一次同步完後的資料最後時間,後者表示上一次同步後的系統時間。

4.3 新增/修改dao

4.3.1 新增臨時表dao及service類

  • 新增類CdcTempRepository

根據配置,由於cdc_temp是在my_spring_batch,而它的讀寫是在dao.local包中,因此需要新增dao.local包,然後新增類CdcTempRepository,如下所示:

@Repository
public interface CdcTempRepository extends BaseMapper<CdcTemp> {
}
複製程式碼
  • 新增類CdcTempService,用於cdc_temp表的讀取及資料更新 主要包括兩個函式,一個是根據ID獲取當前的cdc_temp記錄,以便獲取資料上一次同步的資料最後時間。一個是在同步完成後,更新cdc_temp的資料。如下:
/**
 * 根據id獲取cdc_temp的記錄
 * @param id 記錄ID
 * @return {@link CdcTemp}
 */
public CdcTemp getCurrentCdcTemp(int id){
    return cdcTempRepository.getSQLManager().single(CdcTemp.class,id);
}

/**
 * 根據引數更新cdcTemp表的資料
 * @param cdcTempId cdcTempId
 * @param status job狀態
 * @param lastUpdateTime 最後更新時間
 */
public void updateCdcTempAfterJob(int cdcTempId,BatchStatus status,Date lastUpdateTime){
    //獲取
    CdcTemp cdcTemp = cdcTempRepository.getSQLManager().single(CdcTemp.class,cdcTempId);
    cdcTemp.setCurrentUpdateTime(DateUtil.date());
    //正常完成則更新資料時間
    if( status == BatchStatus.COMPLETED){
        cdcTemp.setLastUpdateTime(lastUpdateTime);
    }else{
        log.info(LogConstants.LOG_TAG+"同步狀態異常:"+ status.toString());
    }
    //設定同步狀態
    cdcTemp.setStatus(status.name());
    cdcTempRepository.updateById(cdcTemp);
}
複製程式碼

4.3.2 修改源資料dao

在源資料dao類OriginUserRepository新增函式getOriginIncreUser,此函式對應user.md中的sql語句。

4.3.3 修改目標資料dao

在目標資料dao類TargetUserRepository中新增函式selectMaxUpdateTime,用於查詢同步後資料的最後時間。由於此方法的sql簡單,可以直接使用@Sql註解,如下所示:

@Sql(value="select max(sys_update_time) from test_user")
Date selectMaxUpdateTime();
複製程式碼

4.4 修改user.md中的sql語句。

4.4.1 新增增量讀資料sql

user.md中新增增量讀資料的sql語句,如下:

getOriginIncreUser
===
* 查詢user資料

select * from test_user
WHERE 1=1
@if(!isEmpty(lastUpdateTime)){
AND (sys_create_time >= #lastUpdateTime# OR sys_update_time >= #lastUpdateTime#)
@}
複製程式碼

說明:

  • @開頭是beetl的語法,可以對變數讀取和邏輯判斷,此處的意思是如果變數lastUpdateTime不為空,則按此條件進行讀取。
  • lastUpdateTime變數由呼叫時傳入(Map
  • 具體beetl使用語法,可參見官方檔案

4.4.2 編寫增量插入sql語句

對於Mysql資料庫,有insert into ... on duplicate key update ...的用法,即可以根據唯一鍵(主鍵或唯一索引),若資料已存在,則更新,不存在,則插入。在user.md檔案中,新增以下語句:

insertIncreUser
===
* 插入資料

insert into test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user)
values (#id#,#name#,#phone#,#title#,#email#,#gender#,#dateOfBirth#,#sysCreateTime#,#sysCreateUser#,#sysUpdateTime#,#sysUpdateUser#)
ON DUPLICATE KEY UPDATE 
id = VALUES(id),name = VALUES(name),phone = VALUES(phone),title = VALUES(title),email = VALUES(email),gender = VALUES(gender),date_of_birth = VALUES(date_of_birth),sys_create_time = VALUES(sys_create_time),sys_create_user = VALUES(sys_create_user),sys_update_time = VALUES(sys_update_time),sys_update_user = VALUES(sys_update_user)
複製程式碼

4.5 編寫Spring Batch的元件

Spring Batch檔案結構如下:

檔案結構

4.5.1 ItemReader

此處與之前的一致,只需要把getOriginUser函式改為getOriginIncreUser即可。

4.5.2 ItemWriter

此處與之前的一致,只需要把sql的ID由user.insertUser改為user.insertIncreUser即可。

4.5.3 新增IncrementJobEndListener

由於資料同步完後,最後一步就是要更新臨時表的最後時間資料。如下:

@Slf4j
public class IncrementJobEndListener extends JobExecutionListenerSupport {

    @Autowired
    private CdcTempService cdcTempService;

    @Autowired
    private TargetUserRepository targetUserRepository;

    @Override
    public void afterJob(JobExecution jobExecution) {
        BatchStatus status = jobExecution.getStatus();
        Date latestDate  = targetUserRepository.selectMaxUpdateTime();
        cdcTempService.updateCdcTempAfterJob(SyncConstants.CDC_TEMP_ID_USER,status,latestDate);
    }
}
複製程式碼

說明:

  • 先查詢當前資料庫中資料最後時間(selectMaxUpdateTime)
  • 更新中間表資料cdc_temp中的last_update_time

4.5.4 新增任務啟動時引數初始化

在資料同步的第一步,需要先初始化臨時表中的資料最後更新時間,因此在任務啟動前,先要進行任務引數設定,以便於把時間引數傳到任務中,在任務執行時使用。如下:

public JobParameters initJobParam(){
    CdcTemp currentCdcTemp = cdcTempService.getCurrentCdcTemp(getCdcTempId());
    //若未初始化,則先查詢資料庫中對應的最後時間
    if(SyncConstants.STR_STATUS_INIT.equals(currentCdcTemp.getStatus())
            || SyncConstants.STR_STATUS_FAILED.equals(currentCdcTemp.getStatus())){
        Date maxUpdateTime = selectMaxUpdateTime();
        //若沒有資料,則按初始時間處理
        if(Objects.nonNull(maxUpdateTime)){
            currentCdcTemp.setLastUpdateTime(maxUpdateTime);
        }
    }
    return JobUtil.makeJobParameters(currentCdcTemp);
}
複製程式碼

4.5.5 組裝完整任務

最後,需要一個IncrementBatchConfig配置把讀、處理、寫、監聽組裝起來,值得一提的是,在配置讀元件時,由於需要使用動態引數,此處需要新增@StepScope註解,同時在引數中使用spEL獲取引數內容,如下所示:

@Bean
@StepScope
public ItemReader incrementItemReader(@Value("#{jobParameters['lastUpdateTime']}") String lastUpdateTime) {
    IncrementUserItemReader userItemReader = new IncrementUserItemReader();
    //設定引數,當前示例可不設定引數
    Map<String,Object> params = CollUtil.newHashMap();
    params.put(SyncConstants.STR_LAST_UPDATE_TIME,lastUpdateTime);
    userItemReader.setParams(params);

    return userItemReader;
}
複製程式碼

4.5.6 測試

參考上一文章的BeetlsqlJobTest,編寫IncrementJobTest測試檔案。由於需要測試增量同步,測試流程如下所示:

  • 測試前增量新增資料 測試前,源資料表和目標資料表已經有資料,在源資料表中,執行程式碼中的sql/user-data-new.sql新增新的使用者。注意,由於sys_create_timesys_update_time定義如下:
`sys_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`sys_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,複製程式碼

從而達到資料插入時自動生成時間,修改時也自動更新時間。

  • 執行測試 以單元測試執行incrementJob

  • 檢視結果 執行完成後,結果如下:

    輸出

增量同步後,資料如下:

結果

5.總結

本文先對增量同步做了一個簡單的介紹,列出當前一般使用的增量同步方法,然後使用Spring BatchBeetlSql使用基於時間戳的方式實現增量同步,本示例具有一定的實用性,希望能對做資料同步或相關批處理的開發者有幫助。