基於canal的client-adapter資料同步必讀指南
本文將介紹canal專案中client-adapter的使用,以及落地生產中需要考慮的可靠性、高可用與監控報警。(基於canal 1.1.4版本)
canal作為mysql的實時資料訂閱元件,實現了對mysql binlog資料的抓取。
雖然阿里也開源了一個純粹從mysql同步資料到mysql的專案otter(github.com/alibaba/otter,基於canal的),實現了mysql的單向同步、雙向同步等能力。但是我們經常有從mysql同步資料到es、hbase等儲存的需求,就需要使用者自己用canal-client獲取資料進行消費,比較麻煩。
從1.1.1版本開始,canal實現了一個配套的落地模組,實現對canal訂閱的訊息進行消費,就是client-adapter(github.com/alibaba/canal/wiki/ClientAdapter)。
目前的最新穩定版1.1.4版本中,client-adapter已經實現了同步資料到RDS、ES、HBase的能力。
1. Client-Adapter基本能力
目前Adapter具備以下基本能力:
- 對接上游訊息,包括kafka、rocketmq、canal-server
- 實現mysql資料的增量同步
- 實現mysql資料的全量同步
- 下游寫入支援mysql、es、hbase
2.Client-Adapter架構
Adapter本質上是為了將canal-server訂閱到的實時增量資料進行消費,所以必須有上游canal-server產生資料。
整體架構如下:
3. 遷移與同步配置(以Mysql為例)
官方文件地址:github.com/alibaba/canal/wiki/Sync-RDB
下面給出實踐過程中的注意事項。
3.1 引數配置
1)總配置檔案application.yml
說明:
- 一份資料可以被多個group同時消費, 多個group之間會是一個並行執行, 一個group內部是一個序列執行多個outerAdapters, 比如例子中logger和hbase
- 目前client adapter資料訂閱的方式支援兩種,直連canal server 或者 訂閱kafka/RocketMQ的訊息
- zookeeperHosts填了以後,可以支援分散式鎖;如果對接Canal-Server為叢集模式,那麼還是需要填寫的,具體原因見下面高可用部分。
2)對應任務的Adapter配置
同步到mysql去的任務配置在conf/rdb路徑下,本文使用的任務配置檔名叫 mysql1.yml
注意!targetPk下面填的是源主鍵和目標主鍵的對映關係, srcPk:targetPk。
3)日誌格式修改
logback.xml中預設日誌等級為debug,線上使用時,記得改到info,否則日誌會打爆
3.2 增量同步能力
1) DML 增量同步
完成上面的配置,啟動後就能正常訂閱增量資料了。Adapter能夠接收到mq到資訊,並在目標庫投遞成功。
具體會打出如下日誌。
2)DDL同步
如果需要使用DDL同步能力,必須在rdb中配置mirroDb為true才可以。
3.3 全量同步能力
Adapter提供了全量同步的能力,具體操作可以參考官網 github.com/alibaba/canal/wiki/ClientAdapter中的3.2節。
這裡我們使用命令
curl http://127.0.0.1:8081/etl/rdb/mysql1/mysql1.yml -X POST
輸出結果如下
4. 動態配置
4.1 任務開關
curl http://127.0.0.1:8081/syncSwitch/dts-dbvtest-insertdata/on -X PUT
如果在application.yml裡面配置了zk地址,那麼會使用分散式開關,這個任務開關會註冊到zk上,對任意機器執行開關,會把所有同樣任務的機器進行啟停。
相關原始碼實現如下:
- 獲取zk上的任務開關狀態資訊
- 如果是false,就斷開連線
4.2 配置變更
1)本地配置檔案
adapter預設是讀取本地的配置檔案進行配置的。
有個比較意外的地方,就是修改配置檔案,任務會自動重新整理配置,實現了動態配置。
我們看下實現原理。
- 繼承了FileAlterationListenerAdaptor
- 發現檔案變更後
- 銷燬目前的canalAdapterService
- 重新整理contextRefresher
- sleep 2秒
- 重新初始化canalAdapterService
最終日誌會列印
2)基於mysql的遠端配置
如果配置了多個adapter,可以採用mysql儲存配置資訊,實現全域性統一的配置。
這個的實現原理也比較簡單:
- 本地非同步執行緒輪訓mysql
- 如果有更新就將更新的配置寫入本地配置檔案
- 動態更新
5. 資料可靠性分析
5.1 ack機制
Adapter的一個任務採用一個多執行緒模型。
- 主執行緒抓取mq的message寫入佇列queue,CountDownLatch等待
- 非同步執行緒poll佇列queue,投遞下游
- 投遞成功後,主執行緒釋放latch,向mq返回ack
這裡需要注意,這裡同步的行為是重新執行一次,比如update一行資料,如果目標庫由於某種原因沒有這條資料的主鍵id,導致update返回0,也是認為消費成功了
5.2 重試機制
application.yml裡面的retries引數用於從queue佇列poll後,投遞下游的重試。
需要仔細權衡一下,重試間隔0.5s,可以設定個x次,避免網路抖動丟失資料。
重試次數到了,會自動ack。所以這裡在使用過程中需要注意採集失敗的日誌,及時報警提醒。
6. 效能問題分析
具體效能要求還是需要通過壓測來得到結論。
這裡給出兩個從原始碼中看到的效能優化相關的點。
6.1 全量同步多執行緒
全量同步的時候,同步效率是一個值得考慮的問題。
adapter對全量資料同步效率做了一些設計,當全量同步數量大於1W會開多執行緒,程式碼如下所示:
但是這裡有個mysql的深分頁的問題,可以注意一下,會對源資料庫造成比較大的效能壓力。
6.2 全量同步select *
全量同步的另一個效率問題,就在於select * ,避免客戶端記憶體被打爆。
看了下原始碼,果然也已經考慮了這個問題,開啟了JDBC的流式查詢。
7. 監控告警
如果要在生產使用,少不了監控告警的輔助。
雖然Adapter不像canal-server那樣提供了監控指標的相關api,但是我們還是可以做一些輔助的監控告警。
1) mq的訊息堆積告警
利用mq已有的topic下的堆積告警,如果Adapter出現故障,造成了mq的訊息堆積,可以及時發現。
2) 日誌異常告警
Adapter有自己的日誌格式,可以跟已有監控系統確認下日誌收集的配置方式與日誌解析格式。
然後通過修改 conf/logback.xml的pattern來修改日誌的列印格式,進行配置採集。
8. 高可用
通過原始碼閱讀發現
tcp模式支援通過zk做HA(非自身高可用),mq模式不支援zk做HA
TCP模式需要HA跟我們的HA理解又不太一樣。
因為需要直接對接上游的Canal-Server,而Canal-Server的HA會導致ip變化,所以adapter的tcp模式的HA是為了支援這個,可以監聽IP變化,對接不同的上游server,並不是自身的高可用架構。
而MQ模式本身是不支援HA的。
但是,我們如果我們對接上游MQ的模式,就可以做一個取巧的高可用。
目前從binlog抓取mq以後,只會投遞到指定topic的一個佇列中(即使雜湊做了多佇列,道理也一樣),因此,mq消費時採用叢集模式,就會只有一個client能夠順序消費對應佇列中的訊息。
這樣,我們部署兩臺adapter,有兩個mq的消費者同時執行,正常情況下,只會有一臺機器在消費任務,一旦一臺機器掛了,mq會自動用另一臺機器的任務進行繼續消費。做了一個簡易的高可用。
缺點也比較明顯,任務無法負載均衡,只能跑在一臺機器上
因此,需要考慮分多個消費者組進行任務處理。
推薦閱讀:
canal十分鐘入門
canal叢集版 + admin控制檯 最新搭建姿勢
canal原始碼分析大綱
都看到最後了,原創不易,點個關注,點個贊吧~
文章持續更新,可以微信搜尋「阿丸筆記 」第一時間閱讀,回覆關鍵字【學習】有我準備的一線大廠面試資料。
知識碎片重新梳理,構建Java知識圖譜:github.com/saigu/JavaK…(歷史文章查閱非常方便)
&nbs