1. 程式人生 > >canal架構原理

canal架構原理

  • canal架構設計

    說明:

    • server代表一個canal執行例項,對應於一個jvm
    • instance對應於一個數據佇列 (1個server對應1..n個instance)

    instance模組:

    • eventParser (資料來源接入,模擬slave協議和master進行互動,協議解析)
    • eventSink (Parser和Store連結器,進行資料過濾,加工,分發的工作)
    • eventStore (資料儲存)
    • metaManager (增量訂閱&消費資訊管理器)

    EventParser

    整個parser過程大致可分為幾部:

    1. Connection獲取上一次解析成功的位置(如果第一次啟動,則獲取初始制定的位置或者是當前資料庫的binlog位點)
    2. Connection建立連線,發生BINLOG_DUMP命令
    3. Mysql開始推送Binary Log
    4. 接收到的Binary Log通過Binlog parser進行協議解析,補充一些特定資訊
    5. 傳遞給EventSink模組進行資料儲存,是一個阻塞操作,直到儲存成功
    6. 儲存成功後,定時記錄Binary Log位置

    EventSink設計

    說明:

    • 資料過濾:支援萬用字元的過濾模式,表名,欄位內容等
    • 資料路由/分發:解決1:n (1個parser對應多個store的模式)
    • 資料歸併:解決n:1 (多個parser對應1個store)
    • 資料加工:在進入store之前進行額外的處理,比如join

    1 資料1:n業務 :

    為了合理的利用資料庫資源, 一般常見的業務都是按照schema進行隔離,然後在mysql上層或者dao這一層面上,進行一個數據源路由,遮蔽資料庫物理位置對開發的影響,阿里系主要是通過cobar/tddl來解決資料來源路由問題。 所以,一般一個數據庫例項上,會部署多個schema,每個schema會有由1個或者多個業務方關注。

    2 資料n:1業務:

    同樣,當一個業務的資料規模達到一定的量級後,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的資料需要處理時,就需要連結多個store進行處理,消費的位點就會變成多份,而且資料消費的進度無法得到儘可能有序的保證。 所以,在一定業務場景下,需要將拆分後的增量資料進行歸併處理,比如按照時間戳/全域性id進行排序歸併.

    EventStore設計

    目前實現了Memory記憶體、本地file儲存以及持久化到zookeeper以保障資料叢集共享。
    Memory記憶體的RingBuffer設計:

    定義了3個cursor

    • Put : Sink模組進行資料儲存的最後一次寫入位置
    • Get : 資料訂閱獲取的最後一次提取位置
    • Ack : 資料消費成功的最後一次消費位置

    借鑑Disruptor的RingBuffer的實現,將RingBuffer拉直來看:

    實現說明:

    • Put/Get/Ack cursor用於遞增,採用long型儲存
    • buffer的get操作,通過取餘或者與操作。(與操作: cusor & (size – 1) , size需要為2的指數,效率比較高)

    Instance設計

    instance代表了一個實際執行的資料佇列,包括了EventPaser,EventSink,EventStore等元件。
    抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:

    1. manager方式: 和你自己的內部web console/manager系統進行對接。(alibaba內部使用方式)

    2. spring方式:基於spring xml + properties進行定義,構建spring配置.

    • spring/memory-instance.xml 所有的元件(parser , sink , store)都選擇了記憶體版模式,記錄位點的都選擇了memory模式,重啟後又會回到初始位點進行解析。特點:速度最快,依賴最少
    • spring/file-instance.xml 所有的元件(parser , sink , store)都選擇了基於file持久化模式,注意,不支援HA機制.支援單機持久化
    • spring/default-instance.xml 所有的元件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證資料叢集共享. 支援HA
    • spring/group-instance.xml 主要針對需要進行多庫合併時,可以將多個物理instance合併為一個邏輯instance,提供客戶端訪問。場景:分庫業務。 比如產品資料拆分了4個庫,每個庫會有一個instance,如果不用group,業務上要消費資料時,需要啟動4個客戶端,分別連結4個instance例項。使用group後,可以在canal server上合併為一個邏輯instance,只需要啟動1個客戶端,連結這個邏輯instance即可.

    Server設計

    server代表了一個canal的執行例項,為了方便元件化使用,特意抽象了Embeded(嵌入式) / Netty(網路訪問)的兩種實現:

    • Embeded : 對latency和可用性都有比較高的要求,自己又能hold住分散式的相關技術(比如failover)
    • Netty : 基於netty封裝了一層網路協議,由canal server保證其可用性,採用的pull模型,當然latency會稍微打點折扣,不過這個也視情況而定。

    增量訂閱/消費設計

    具體的協議格式,可參見:CanalProtocol.proto
    get/ack/rollback協議介紹:

    • Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的物件為Message,包含的內容為:
    • a. batch id 唯一標識
    • b. entries 具體的資料物件,對應的資料物件格式:EntryProtocol.proto
    • void rollback(long batchId),顧命思議,回滾上次的get請求,重新獲取資料。基於get獲取的batchId進行提交,避免誤操作
    • void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除資料。基於get獲取的batchId進行提交,避免誤操作
    • canal的get/ack/rollback協議和常規的jms協議有所不同,允許get/ack非同步處理,比如可以連續呼叫get多次,後續非同步按順序提交ack/rollback,專案中稱之為流式api.
    • 流式api設計的好處:
    • get/ack非同步化,減少因ack帶來的網路延遲和操作成本 (99%的狀態都是處於正常狀態,異常的rollback屬於個別情況,沒必要為個別的case犧牲整個效能)
    • get獲取資料後,業務消費存在瓶頸或者需要多程序/多執行緒消費時,可以不停的輪詢get資料,不停的往後傳送任務,提高並行化. (作者在實際業務中的一個case:業務資料消費需要跨中美網路,所以一次操作基本在200ms以上,為了減少延遲,所以需要實施並行化)

    流式api設計:

    • 每次get操作都會在meta中產生一個mark,mark標記會遞增,保證執行過程中mark的唯一性
    • 每次的get操作,都會在上一次的mark操作記錄的cursor繼續往後取,如果mark不存在,則在last ack cursor繼續往後取
    • 進行ack時,需要按照mark的順序進行數序ack,不能跳躍ack. ack會刪除當前的mark標記,並將對應的mark位置更新為last ack cusor
    • 一旦出現異常情況,客戶端可發起rollback情況,重新置位:刪除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續往後取

    資料格式

    canal採用protobuff:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    Entry

        Header

            logfileName [binlog檔名]

            logfileOffset [binlog position]

            executeTime [發生的變更]

            schemaName

            tableName

            eventType [insert/update/delete型別]

        entryType   [事務頭BEGIN/事務尾END/資料ROWDATA]

        storeValue  [byte資料,可展開,對應的型別為RowChange]   

    RowChange

        isDdl       [是否是ddl變更操作,比如create table/drop table]

        sql     [具體的ddl sql]

        rowDatas    [具體insert/update/delete的變更資料,可為多條,1個binlog event事件可對應多條變更,比如批處理]

            beforeColumns [Column型別的陣列]

            afterColumns [Column型別的陣列]     

    Column

        index      

        sqlType     [jdbc type]

        name        [column name]

        isKey       [是否為主鍵]

        updated     [是否發生過變更]

        isNull      [值是否為null]

        value       [具體的內容,注意為文字]

    canal-message example:

    比如資料庫中的表:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    mysql> select * from person;

    +----+------+------+------+

    | id | name | age  | sex  |

    +----+------+------+------+

    |  1 | zzh  |   10 | m    |

    |  3 | zzh3 |   12 | f    |

    |  4 | zzh4 |    5 | m    |

    +----+------+------+------+

    3 rows in set (0.00 sec)

    更新一條資料(update person set age=15 where id=4):

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    ****************************************************

    * Batch Id: [2] ,count : [3] , memsize : [165] , Time : 2016-09-07 15:54:18

    * Start : [mysql-bin.000003:6354:1473234846000(2016-09-07 15:54:06)]

    * End : [mysql-bin.000003:6550:1473234846000(2016-09-07 15:54:06)]

    ****************************************************

     

    ================> binlog[mysql-bin.000003:6354] , executeTime : 1473234846000 , delay : 12225ms

     BEGIN ----> Thread id: 67

    ----------------> binlog[mysql-bin.000003:6486] , name[canal_test,person] , eventType : UPDATE , executeTime :1473234846000 , delay : 12225ms

    id : 4    type=int(11)

    name : zzh4    type=varchar(100)

    age : 15    type=int(11)    update=true

    sex : m    type=char(1)

    ----------------

     END ----> transaction id: 308

    ================> binlog[mysql-bin.000003:6550] , executeTime : 1473234846000 , delay : 12240ms

    HA機制設計

    canal的HA分為兩部分,canal server和canal client分別有對應的ha實現:

    • canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.
    • canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序。

    整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命週期繫結),可以看下我之前zookeeper的相關文章。

    Canal Server:

    大致步驟:

    1. canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 (實現:建立EPHEMERAL節點,誰建立成功就允許誰啟動)
    2. 建立zookeeper節點成功後,對應的canal server就啟動對應的canal instance,沒有建立成功的canal instance就會處於standby狀態
    3. 一旦zookeeper發現canal server A建立的節點消失後,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance.
    4. canal client每次進行connect時,會首先向zookeeper詢問當前是誰啟動了canal instance,然後和其建立連結,一旦連結不可用,會重新嘗試connect.
    5. Canal Client的方式和canal server方式類似,也是利用zokeeper的搶佔EPHEMERAL節點的方式進行控制.

    HA配置架構圖(舉例)如下所示:

    canal其他連結方式

    canal還有幾種連線方式:

    1. 單連

    2. 兩個client+兩個instance+1個mysql

    當mysql變動時,兩個client都能獲取到變動

    3. 一個server+兩個instance+兩個mysql+兩個client

    4. instance的standby配置

    整體架構

    從整體架構上來說canal是這種架構的(canal中沒有包含一個運維的console web來對接,但要運用於分散式環境中肯定需要一個Manager來管理):

    一個總體的manager system對應於n個Canal Server(物理上來說是一臺伺服器), 那麼一個Canal Server對應於n個Canal Instance(destinations). 大體上是三層結構,第二層也需要Manager統籌運維管理。

    那麼隨著Docker技術的興起,是否可以試一下下面的架構呢?

    • 一個docker中跑一個instance服務,相當於略去server這一層的概念。
    • Manager System中配置一個instance,直接調取一個docker釋出這個instance,其中包括向這個instance傳送配置資訊,啟動instance服務.
    • instance在執行過程中,定時重新整理binlog filename+ binlog position的資訊至zk。
    • 如果一個instance出現故障,instance本身報錯或者zk感知此node消失,則根據相應的資訊,比如上一步儲存的binlog filename+binlog position重新開啟一個docker服務,當然這裡可以適當的加一些重試機制。
    • 當要更新時,類似AB test, 先關閉一個docker,然後開啟新的已更新的替換,循序漸進的進行。
    • 當涉及到分表分庫時,多個物理表對應於一個邏輯表,可以將結果存於一個公共的模組(比如MQ),或者單獨存取也可以,具體情況具體分析
    • 儲存可以參考canal的多樣化:記憶體,檔案,zk,或者加入至MQ中
    • docker由此之外的工具管理,比如kubernetes
    • 也可以進一步新增HA的功能,兩個docker對應一個mysql,互為主備,類似Canal的HA架構。如果時效性不是貼彆強的場景,考慮到成本,此功能可以不採用。

    總結

    這裡總結了一下Canal的一些點,僅供參考:

    1. 原理:模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議;mysql master收到dump請求,開始推送binary log給slave(也就是canal);解析binary log物件(原始為byte流)
    2. 重複消費問題:在消費端解決。
    3. 採用開源的open-replicator來解析binlog
    4. canal需要維護EventStore,可以存取在Memory, File, zk
    5. canal需要維護客戶端的狀態,同一時刻一個instance只能有一個消費端消費
    6. 資料傳輸格式:protobuff
    7. 支援binlog format 型別:statement, row, mixed. 多次附加功能只能在row下使用,比如otter
    8. binlog position可以支援儲存在記憶體,檔案,zk中
    9. instance啟動方式:rpc/http; 內嵌
    10. 有ACK機制
    11. 無告警,無監控,這兩個功能都需要對接外部系統
    12. 方便快速部署。