1. 程式人生 > >基於canal的client-adapter資料同步必讀指南

基於canal的client-adapter資料同步必讀指南

本文將介紹canal專案中client-adapter的使用,以及落地生產中需要考慮的可靠性、高可用與監控報警。(基於canal 1.1.4版本)

 

canal作為mysql的實時資料訂閱元件,實現了對mysql binlog資料的抓取。

雖然阿里也開源了一個純粹從mysql同步資料到mysql的專案otter(github.com/alibaba/otter,基於canal的),實現了mysql的單向同步、雙向同步等能力。但是我們經常有從mysql同步資料到es、hbase等儲存的需求,就需要使用者自己用canal-client獲取資料進行消費,比較麻煩。

從1.1.1版本開始,canal實現了一個配套的落地模組,實現對canal訂閱的訊息進行消費,就是client-adapter(github.com/alibaba/canal/wiki/ClientAdapter)。

目前的最新穩定版1.1.4版本中,client-adapter已經實現了同步資料到RDS、ES、HBase的能力。

1. Client-Adapter基本能力

目前Adapter具備以下基本能力:

  • 對接上游訊息,包括kafka、rocketmq、canal-server
  • 實現mysql資料的增量同步
  • 實現mysql資料的全量同步
  • 下游寫入支援mysql、es、hbase

2.Client-Adapter架構

Adapter本質上是為了將canal-server訂閱到的實時增量資料進行消費,所以必須有上游canal-server產生資料。

整體架構如下:

 

3. 遷移與同步配置(以Mysql為例)

官方文件地址:github.com/alibaba/canal/wiki/Sync-RDB

下面給出實踐過程中的注意事項。

3.1 引數配置

1)總配置檔案application.yml

 

說明:

  • 一份資料可以被多個group同時消費, 多個group之間會是一個並行執行, 一個group內部是一個序列執行多個outerAdapters, 比如例子中logger和hbase
  • 目前client adapter資料訂閱的方式支援兩種,直連canal server 或者 訂閱kafka/RocketMQ的訊息
  • zookeeperHosts填了以後,可以支援分散式鎖;如果對接Canal-Server為叢集模式,那麼還是需要填寫的,具體原因見下面高可用部分。

2)對應任務的Adapter配置

同步到mysql去的任務配置在conf/rdb路徑下,本文使用的任務配置檔名叫 mysql1.yml

 

注意!targetPk下面填的是源主鍵和目標主鍵的對映關係, srcPk:targetPk。

3)日誌格式修改

logback.xml中預設日誌等級為debug,線上使用時,記得改到info,否則日誌會打爆

3.2 增量同步能力

1) DML 增量同步

完成上面的配置,啟動後就能正常訂閱增量資料了。Adapter能夠接收到mq到資訊,並在目標庫投遞成功。

具體會打出如下日誌。

 

2)DDL同步

如果需要使用DDL同步能力,必須在rdb中配置mirroDb為true才可以。

 

3.3 全量同步能力

Adapter提供了全量同步的能力,具體操作可以參考官網 github.com/alibaba/canal/wiki/ClientAdapter中的3.2節。

這裡我們使用命令

curl http://127.0.0.1:8081/etl/rdb/mysql1/mysql1.yml -X POST 

輸出結果如下

 

4. 動態配置

4.1 任務開關

curl http://127.0.0.1:8081/syncSwitch/dts-dbvtest-insertdata/on -X PUT

如果在application.yml裡面配置了zk地址,那麼會使用分散式開關,這個任務開關會註冊到zk上,對任意機器執行開關,會把所有同樣任務的機器進行啟停。

相關原始碼實現如下:

  • 獲取zk上的任務開關狀態資訊
  • 如果是false,就斷開連線

 

4.2 配置變更

1)本地配置檔案

adapter預設是讀取本地的配置檔案進行配置的。

有個比較意外的地方,就是修改配置檔案,任務會自動重新整理配置,實現了動態配置。

我們看下實現原理。

  • 繼承了FileAlterationListenerAdaptor
  • 發現檔案變更後
  • 銷燬目前的canalAdapterService
  • 重新整理contextRefresher
  • sleep 2秒
  • 重新初始化canalAdapterService

 

最終日誌會列印

 

2)基於mysql的遠端配置

如果配置了多個adapter,可以採用mysql儲存配置資訊,實現全域性統一的配置。

這個的實現原理也比較簡單:

  • 本地非同步執行緒輪訓mysql
  • 如果有更新就將更新的配置寫入本地配置檔案
  • 動態更新

5. 資料可靠性分析

5.1 ack機制

Adapter的一個任務採用一個多執行緒模型。

  • 主執行緒抓取mq的message寫入佇列queue,CountDownLatch等待
  • 非同步執行緒poll佇列queue,投遞下游
  • 投遞成功後,主執行緒釋放latch,向mq返回ack

這裡需要注意,這裡同步的行為是重新執行一次,比如update一行資料,如果目標庫由於某種原因沒有這條資料的主鍵id,導致update返回0,也是認為消費成功了

5.2 重試機制

application.yml裡面的retries引數用於從queue佇列poll後,投遞下游的重試。

需要仔細權衡一下,重試間隔0.5s,可以設定個x次,避免網路抖動丟失資料。

重試次數到了,會自動ack。所以這裡在使用過程中需要注意採集失敗的日誌,及時報警提醒。

6. 效能問題分析

具體效能要求還是需要通過壓測來得到結論。

這裡給出兩個從原始碼中看到的效能優化相關的點。

6.1 全量同步多執行緒

全量同步的時候,同步效率是一個值得考慮的問題。

adapter對全量資料同步效率做了一些設計,當全量同步數量大於1W會開多執行緒,程式碼如下所示:

 

但是這裡有個mysql的深分頁的問題,可以注意一下,會對源資料庫造成比較大的效能壓力。

6.2 全量同步select *

全量同步的另一個效率問題,就在於select * ,避免客戶端記憶體被打爆。

看了下原始碼,果然也已經考慮了這個問題,開啟了JDBC的流式查詢。

 

7. 監控告警

如果要在生產使用,少不了監控告警的輔助。

雖然Adapter不像canal-server那樣提供了監控指標的相關api,但是我們還是可以做一些輔助的監控告警。

1) mq的訊息堆積告警

利用mq已有的topic下的堆積告警,如果Adapter出現故障,造成了mq的訊息堆積,可以及時發現。

2) 日誌異常告警

Adapter有自己的日誌格式,可以跟已有監控系統確認下日誌收集的配置方式與日誌解析格式。

然後通過修改 conf/logback.xml的pattern來修改日誌的列印格式,進行配置採集。

8. 高可用

通過原始碼閱讀發現

tcp模式支援通過zk做HA(非自身高可用),mq模式不支援zk做HA

 

TCP模式需要HA跟我們的HA理解又不太一樣。

因為需要直接對接上游的Canal-Server,而Canal-Server的HA會導致ip變化,所以adapter的tcp模式的HA是為了支援這個,可以監聽IP變化,對接不同的上游server,並不是自身的高可用架構。

而MQ模式本身是不支援HA的。

但是,我們如果我們對接上游MQ的模式,就可以做一個取巧的高可用。

目前從binlog抓取mq以後,只會投遞到指定topic的一個佇列中(即使雜湊做了多佇列,道理也一樣),因此,mq消費時採用叢集模式,就會只有一個client能夠順序消費對應佇列中的訊息。

這樣,我們部署兩臺adapter,有兩個mq的消費者同時執行,正常情況下,只會有一臺機器在消費任務,一旦一臺機器掛了,mq會自動用另一臺機器的任務進行繼續消費。做了一個簡易的高可用。

 


缺點也比較明顯,任務無法負載均衡,只能跑在一臺機器上

因此,需要考慮分多個消費者組進行任務處理。

 

推薦閱讀:

canal十分鐘入門

canal叢集版 + admin控制檯 最新搭建姿勢

canal原始碼分析大綱

 

都看到最後了,原創不易,點個關注,點個贊吧~
文章持續更新,可以微信搜尋「阿丸筆記 」第一時間閱讀,回覆關鍵字【學習】有我準備的一線大廠面試資料。
知識碎片重新梳理,構建Java知識圖譜:github.com/saigu/JavaK…(歷史文章查閱非常方便)

 

  &nbs