1. 程式人生 > 程式設計 >?史上最全的分散式資料同步中間間canal 之結束篇

?史上最全的分散式資料同步中間間canal 之結束篇

前言

文字已收錄至我的GitHub倉庫,歡迎Star:github.com/bin39232820…
種一棵樹最好的時間是十年前,其次是現在
我知道很多人不玩qq了,但是懷舊一下,歡迎加入六脈神劍Java菜鳥學習群,群聊號碼:549684836 鼓勵大家在技術的路上寫部落格

絮叨

canal的入門篇,我已經帶大家搭建好了canal 並用Java客服端 去訂閱canal 從mysql那邊拿到的binlog日誌

其實我們生產中的作用也是差不多這麼玩的,只是說完善一點、,這篇帶大家說說canal的一些原理。 因為這篇是進階,所以建議大家呢一定要按我前面的把第一個demo做出來,再看第二篇
?史上最全的分散式資料同步中間間canal之入門篇

canal的工作原理:

原理相對比較簡單:

  • canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議
  • mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  • canal解析binary log物件(原始為byte流)

canal的架構

說明:

  • server代表一個canal執行例項,對應於一個jvm
  • instance對應於一個資料佇列 (1個server對應1..n個instance)

instance模組:

  • eventParser (資料來源接入,模擬slave協議和master進行互動,協議解析)
  • eventSink (Parser和Store連結器,進行資料過濾,加工,分發的工作)
  • eventStore (資料儲存)
  • metaManager (增量訂閱&消費資訊管理器)

EventParser設計

  • 第一步 Connection獲取上一次解析成功的位置 (如果第一次啟動,則獲取初始指定的位置或者是當前資料庫的binlog位點)
  • 第二步 nnection建立連結,傳送BINLOG_DUMP指令
  • 第三步 Mysql開始推送Binaly Log
  • 第四步 接收到的Binaly Log的通過Binlog parser進行協議解析,補充一些特定資訊。
  • 第五步 傳遞給EventSink模組進行資料儲存,是一個阻塞操作,直到儲存成功
  • 第六步 儲存成功後,定時記錄Binaly Log位置

EventSink設計

  • 資料過濾:支援萬用字元的過濾模式,表名,欄位內容等
  • 資料路由/分發:解決1:n (1個parser對應多個store的模式)
  • 資料歸併:解決n:1 (多個parser對應1個store)
  • 資料加工:在進入store之前進行額外的處理,比如join

EventStore設計

    1. 目前僅實現了Memory記憶體模式,後續計劃增加本地file儲存,mixed混合模式
    1. 借鑑了Disruptor的RingBuffer的實現思路

定義了3個cursor

  • Put : Sink模組進行資料儲存的最後一次寫入位置
  • Get : 資料訂閱獲取的最後一次提取位置
  • Ack : 資料消費成功的最後一次消費位置

增量訂閱/消費設計(還是要第一節的基礎,不然很難看懂)

具體的協議格式,可參見:CanalProtocol.proto

get/ack/rollback協議介紹:

Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的物件為Message,包含的內容為:

  • a. batch id 唯一標識
  • b. entries 具體的資料物件,對應的資料物件格式:EntryProtocol.proto
  • void rollback(long batchId),顧命思議,回滾上次的get請求,重新獲取資料。基於get獲取的batchId進行提交,避免誤操作
  • void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除資料。基於get獲取的batchId進行提交,避免誤操作

canal的get/ack/rollback協議和常規的jms協議有所不同,允許get/ack非同步處理,比如可以連續呼叫get多次,後續非同步按順序提交ack/rollback,專案中稱之為流式api.

資料物件格式:EntryProtocol.proto

Entry  
    Header  
        logfileName [binlog檔名]  
        logfileOffset [binlog position]  
        executeTime [發生的變更]  
        schemaName   
        tableName  
        eventType [insert/update/delete型別]  
    entryType   [事務頭BEGIN/事務尾END/資料ROWDATA]  
    storeValue  [byte資料,可展開,對應的型別為RowChange]  
      
RowChange  
    isDdl       [是否是ddl變更操作,比如create table/drop table]  
    sql     [具體的ddl sql]  
    rowDatas    [具體insert/update/delete的變更資料,可為多條,1個binlog event事件可對應多條變更,比如批處理]  
        beforeColumns [Column型別的陣列]  
        afterColumns [Column型別的陣列]  
          
Column   
    index         
    sqlType     [jdbc type]  
    name        [column name]  
    isKey       [是否為主鍵]  
    updated     [是否發生過變更]  
    isNull      [值是否為null]  
    value       [具體的內容,注意為文字]  
複製程式碼

可以提供資料庫變更前和變更後的欄位內容,針對binlog中沒有的name,isKey等資訊進行補全 可以提供ddl的變更語句

其實canal還可以直接用mq去訂閱 這樣就不用再寫一個Java客戶端了

具體參考:配置Canal投遞訊息到RocketMQ

喜歡的一句話

昨天看書聽到了一句非常喜歡發話送給大家:這個是嶽麓書院的一幅對聯,大家有機會可以去看看

是非審之於己,譭譽聽之於人,得失安之於數,成敗歸之於零,

是是非非由自己的內心來判斷,詆譭還是讚譽隨別人去說,得到的與失去的都只是天定的。

結尾

canal系列完結了,其實講的不是很深,但是基本上能自己用了,如果要深入還得靠大家自己,因為我自己也還只是瞭解層面,感謝大家的支援,下期打算做Java基礎吧 ,感覺Java基礎,要講的東西好多呀。正好大家明年面試 哈哈

因為博主也是一個開發萌新 我也是一邊學一邊寫 我有個目標就是一週 二到三篇 希望能堅持個一年吧 希望各位大佬多提意見,讓我多學習,一起進步。

日常求贊

好了各位,以上就是這篇文章的全部內容了,能看到這裡的人呀,都是人才

創作不易,各位的支援和認可,就是我創作的最大動力,我們下篇文章見

六脈神劍 | 文 【原創】如果本篇部落格有任何錯誤,請批評指教,不勝感激 !