Canal:同步mysql增量資料工具,一篇詳解核心知識點
老劉是一名即將找工作的研二學生,寫部落格一方面是總結大資料開發的知識點,一方面是希望能夠幫助夥伴讓自學從此不求人。由於老劉是自學大資料開發,部落格中肯定會存在一些不足,還希望大家能夠批評指正,讓我們一起進步!
背景
大資料領域資料來源有業務庫的資料,也有移動端埋點資料、伺服器端產生的日誌資料。我們在對資料進行採集時根據下游對資料的要求不同,我們可以使用不同的採集工具來進行。今天老劉給大家講的是同步mysql增量資料的工具Canal,本篇文章的大綱如下:
-
Canal 的概念 -
mysql 中主備複製實現原理 -
Canal 如何從 MySQL 中同步資料 -
Canal 的 HA 機制設計 -
各種資料同步解決方法的簡單總結
老劉爭取用這一篇文章讓大家直接上手 Canal 這個工具,不再花別的時間來學習。
mysql 主備複製實現原理
由於 Canal 是用來同步 mysql 中增量資料的,所以老劉先講 mysql 的主備複製原理,之後再講 Canal 的核心知識點。
根據這張圖,老劉把 mysql 的主備複製原理分解為如下流程:
-
主伺服器首先必須啟動二進位制日誌 binlog,用來記錄任何修改了資料庫資料的事件。 -
主伺服器將資料的改變記錄到二進位制 binlog 日誌。 -
從伺服器會將主伺服器的二進位制日誌複製到其本地的中繼日誌(Relaylog)中。這一步細化的說就是首先從伺服器會啟動一個工作執行緒 I/O 執行緒,I/O 執行緒會跟主庫建立一個普通的客戶單連線,然後在主伺服器上啟動一個特殊的二進位制轉儲(binlog dump)執行緒,這個 binlog dump 執行緒會讀取主伺服器上二進位制日誌中的事件,然後向 I/O 執行緒傳送二進位制事件,並儲存到從伺服器上的中繼日誌中。 -
從伺服器啟動 SQL 執行緒,從中繼日誌中讀取二進位制日誌,並且在從伺服器本地會再執行一次資料修改操作,從而實現從伺服器資料的更新。
那麼 mysql 主備複製實現原理就講完了,大家看完這個流程,能不能猜到 Canal 的工作原理?
Canal 核心知識點
Canal 的工作原理
Canal 的工作原理就是它模擬 MySQL slave 的互動協議,把自己偽裝為 MySQL slave,向 MySQL master 發動 dump 協議。MySQL master 收到 dump 請求後,就會開始推送 binlog 給 Canal。最後 Canal 就會解析 binlog 物件。
Canal 概念
Canal,美[kəˈnæl],是這樣讀的,意思是水道/管道/渠道,主要用途就是用來同步 MySQL 中的增量資料(可以理解為實時資料),是阿里巴巴旗下的一款純 Java 開發的開源專案。
Canal 架構
server 代表一個 canal 執行例項,對應於一個 JVM。 instance 對應於一個數據佇列,1 個 canal server 對應 1..n 個 instance instance 下的子模組:
-
EventParser:資料來源接入,模擬 salve 協議和 master 進行互動,協議解析 -
EventSink:Parser 和 Store 連結器,進行資料過濾,加工,分發的工作 -
EventStore:資料儲存 -
MetaManager: 增量訂閱&消費資訊管理器
到現在 Canal 的基本概念就講完了,那接下來就要講 Canal 如何同步 mysql 的增量資料。
Canal 同步 MySQL 增量資料
開啟 mysql binlog
我們用 Canal 同步 mysql 增量資料的前提是 mysql 的 binlog 是開啟的,阿里雲的 mysql 資料庫是預設開啟 binlog 的,但是如果我們是自己安裝的 mysql 需要手動開啟 binlog 日誌功能。
先找到 mysql 的配置檔案:
etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog-format=ROW
這裡有一個知識點是關於 binlog 的格式,老劉給大家講講。
binlog 的格式有三種:STATEMENT、ROW、MIXED
-
ROW 模式(一般就用它)
日誌會記錄每一行資料被修改的形式,不會記錄執行 SQL 語句的上下文相關資訊,只記錄要修改的資料,哪條資料被修改了,修改成了什麼樣子,只有 value,不會有 SQL 多表關聯的情況。
優點:它僅僅只需要記錄哪條資料被修改了,修改成什麼樣子了,所以它的日誌內容會非常清楚地記錄下每一行資料修改的細節,非常容易理解。
缺點:ROW 模式下,特別是資料新增的情況下,所有執行的語句都會記錄到日誌中,都將以每行記錄的修改來記錄,這樣會產生大量的日誌內容。
-
STATEMENT 模式
每條會修改資料的 SQL 語句都會被記錄下來。
缺點:由於它是記錄的執行語句,所以,為了讓這些語句在 slave 端也能正確執行,那他還必須記錄每條語句在執行過程中的一些相關資訊,也就是上下文資訊,以保證所有語句在 slave 端被執行的時候能夠得到和在 master 端執行時候相同的結果。
但目前例如 step()函式在有些版本中就不能被正確複製,在儲存過程中使用了 last-insert-id()函式,可能會使 slave 和 master 上得到不一致的 id,就是會出現資料不一致的情況,ROW 模式下就沒有。
-
MIXED 模式
以上兩種模式都使用。
Canal 實時同步
-
首先我們要配置環境,在 conf/example/instance.properties 下:
##mysqlserverId
canal.instance.mysql.slaveId=1234
#positioninfo,需要修改成自己的資料庫資訊
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
#canal.instance.standby.address=
#canal.instance.standby.journal.name=
#canal.instance.standby.position=
#canal.instance.standby.timestamp=
#username/password,需要修改成自己的資料庫資訊
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=
canal.instance.connectionCharset=UTF-8
#tableregex
canal.instance.filter.regex=.\*\\\\..\*
其中,canal.instance.connectionCharset 代表資料庫的編碼方式對應到 java 中的編碼型別,比如 UTF-8,GBK,ISO-8859-1。
-
配置完後,就要啟動了
shbin/startup.sh
關閉使用bin/stop.sh
-
觀察日誌
一般使用 cat 檢視 canal/canal.log、example/example.log
-
啟動客戶端
在 IDEA 中業務程式碼,mysql 中如果有增量資料就拉取過來,在 IDEA 控制檯打印出來
在 pom.xml 檔案中新增:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
新增客戶端程式碼:
publicclassDemo{
publicstaticvoidmain(String[]args){
//建立連線
CanalConnectorconnector=CanalConnectors.newSingleConnector(newInetSocketAddress("hadoop03",11111),
"example","","");
connector.connect();
//訂閱
connector.subscribe();
connector.rollback();
intbatchSize=1000;
intemptyCount=0;
inttotalEmptyCount=100;
while(totalEmptyCount>emptyCount){
Messagemsg=connector.getWithoutAck(batchSize);
longid=msg.getId();
List<CanalEntry.Entry>entries=msg.getEntries();
if(id==-1||entries.size()==0){
emptyCount++;
System.out.println("emptyCount:"+emptyCount);
try{
Thread.sleep(3000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}else{
emptyCount=0;
printEntry(entries);
}
connector.ack(id);
}
}
//batch->entries->rowchange-rowdata->cols
privatestaticvoidprintEntry(List<CanalEntry.Entry>entries){
for(CanalEntry.Entryentry:entries){
if(entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONBEGIN||
entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONEND){
continue;
}
CanalEntry.RowChangerowChange=null;
try{
rowChange=CanalEntry.RowChange.parseFrom(entry.getStoreValue());
}catch(InvalidProtocolBufferExceptione){
e.printStackTrace();
}
CanalEntry.EventTypeeventType=rowChange.getEventType();
System.out.println(entry.getHeader().getLogfileName()+"__"+
entry.getHeader().getSchemaName()+"__"+eventType);
List<CanalEntry.RowData>rowDatasList=rowChange.getRowDatasList();
for(CanalEntry.RowDatarowData:rowDatasList){
for(CanalEntry.Columncolumn:rowData.getAfterColumnsList()){
System.out.println(column.getName()+"-"+
column.getValue()+"-"+
column.getUpdated());
}
}
}
}
}
-
在mysql中寫資料,客戶端就會把增量資料列印到控制檯。
Canal 的 HA 機制設計
在大資料領域很多框架都會有 HA 機制,Canal 的 HA 分為兩部分,Canal server 和 Canal client 分別有對應的 HA 實現:
-
canal server:為了減少對 mysql dump 的請求,不同 server 上的 instance 要求同一時間只能有一個處於 running,其他的處於 standby 狀態。 -
canal client:為了保證有序性,一份 instance 同一時間只能由一個 canal client 進行 get/ack/rollback 操作,否則客戶端接收無法保證有序。
整個 HA 機制的控制主要是依賴了 ZooKeeper 的幾個特性,ZooKeeper 這裡就不講了。
Canal Server:
-
canal server 要啟動某個 canal instance 時都先向 ZooKeeper 進行一次嘗試啟動判斷(建立 EPHEMERAL 節點,誰建立成功就允許誰啟動)。 -
建立 ZooKeeper 節點成功後,對應的 canal server 就啟動對應的 canal instance,沒有建立成功的 canal instance 就會處於 standby 狀態。 -
一旦 ZooKeeper 發現 canal server 建立的節點消失後,立即通知其他的 canal server 再次進行步驟 1 的操作,重新選出一個 canal server 啟動 instance。 -
canal client 每次進行 connect 時,會首先向 ZooKeeper 詢問當前是誰啟動了 canal instance,然後和其建立連線,一旦連線不可用,會重新嘗試 connect。 -
canal client 的方式和 canal server 方式類似,也是利用 ZooKeeper 的搶佔 EPHEMERAL 節點的方式進行控制。
Canal HA 的配置,並把資料實時同步到 kafka 中。
-
修改 conf/canal.properties 檔案
canal.zkServers=hadoop02:2181,hadoop03:2181,hadoop04:2181
canal.serverMode=kafka
canal.mq.servers=hadoop02:9092,hadoop03:9092,hadoop04:9092
-
配置 conf/example/example.instance
canal.instance.mysql.slaveId=790/兩臺canalserver的slaveID唯一
canal.mq.topic=canal_log//指定將資料傳送到kafka的topic
資料同步方案總結
講完了 Canal 工具,現在給大家簡單總結下目前常見的資料採集工具,不會涉及架構知識,只是簡單總結,讓大家有個印象。
常見的資料採集工具有:DataX、Flume、Canal、Sqoop、LogStash 等。
DataX (處理離線資料)
DataX 是阿里巴巴開源的一個異構資料來源離線同步工具,異構資料來源離線同步指的是將源端資料同步到目的端,但是端與端的資料來源型別種類繁多,在沒有 DataX 之前,端與端的鏈路將組成一個複雜的網狀結構,非常零散無法把同步核心邏輯抽象出來。
為了解決異構資料來源同步問題,DataX 將複雜的網狀的同步鏈路變成了星型資料鏈路,DataX 作為中間傳輸載體負責連線各種資料來源。
所以,當需要接入一個新的資料來源的時候,只需要將此資料來源對接到 DataX,就可以跟已有的資料來源做到無縫資料同步。
DataX本身作為離線資料同步框架,採用Framework+plugin架構構建。將資料來源讀取和寫入抽象成為Reader/Writer外掛,納入到整個同步框架中。
-
Reader: 它為資料採集模組,負責採集資料來源的資料,將資料傳送給Framework。 -
Writer: 它為資料寫入模組,負責不斷向Framework取資料,並將資料寫入到目的端。 -
Framework:它用於連線Reader和Writer,作為兩者的資料傳輸通道,並處理緩衝、併發、資料轉換等問題。
DataX的核心架構如下圖:
核心模組介紹:
-
DataX完成單個數據同步的作業,我們把它稱之為Job,DataX接收到一個Job之後,將啟動一個程序來完成整個作業同步過程。 -
DataX Job啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於併發執行。 -
切分多個Task之後,DataX Job會呼叫Scheduler模組,根據配置的併發資料量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的併發執行完畢分配好的所有Task,預設單個任務組的併發數量為5。 -
每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader->Channel->Writer的執行緒來完成任務同步工作。 -
DataX作業執行完成之後,Job監控並等待多個TaskGroup模組任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出。
Flume(處理實時資料)
Flume主要應用的場景是同步日誌資料,主要包含三個元件:Source、Channel、Sink。
Flume最大的優點就是官網提供了豐富的Source、Channel、Sink,根據不同的業務需求,我們可以在官網查詢相關配置。另外,Flume還提供了自定義這些元件的介面。
Logstash(處理離線資料)
Logstash就是一根具備實時資料傳輸能力的管道,負責將資料資訊從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以讓你根據自己的需求在中間加上過濾網,Logstash提供了很多功能強大的過濾網來滿足各種應用場景。
Logstash是由JRuby編寫,使用基於訊息的簡單架構,在JVM上執行。在管道內的資料流稱之為event,它分為inputs階段、filters階段、outputs階段。
Sqoop(處理離線資料)
Sqoop是Hadoop和關係型資料庫之間傳送資料的一種工具,它是用來從關係型資料庫如MySQL到Hadoop的HDFS從Hadoop檔案系統匯出資料到關係型資料庫。Sqoop底層用的還是MapReducer,用的時候一定要注意資料傾斜。
總結
老劉本篇文章主要講述了Canal工具的核心知識點及其資料採集工具的對比,其中資料採集工具只是大致講了講概念和應用,目的也是讓大家有個印象。老劉敢做保證看完這篇文章基本等於入門,剩下的就是練習了。
好啦,同步mysql增量資料的工具Canal的內容就講完了,儘管當前水平可能不及各位大佬,但老劉會努力變得更加優秀,讓各位小夥伴自學從此不求人!
如果有相關問題,聯絡公眾號:努力的老劉。文章都看到這了,點贊關注支援一波!