【Canal】01 入門 & Kafka模式
阿新 • • 發佈:2022-01-26
什麼是Canal (卡耐爾) ?
Canal 是用 Java 開發的基於資料庫增量日誌解析,提供增量資料訂閱&消費的中介軟體 原理基於MySQL的binlog從庫監聽一、MySQL環境配置
1、更改MySQL配置 (my.ini / my.cnf):
[mysqld] # 主庫id標識 server-id=1 # 開啟binlog日誌 log-bin=mysql-bin # 日誌格式型別 binlog_format=row # (可選)宣告只對哪個庫進行日誌輸出 binlog-do-db=gmall-2021
2、測試用例
沒有表就建立一個測試用的表:
CREATETABLE user_info( `id` VARCHAR(255), `name` VARCHAR(255), `sex` VARCHAR(255) );
3、監聽的賬號
和主從複製一樣,需要提供一個從庫監聽的賬號:
CREATE USER 'canal'@'%' IDENTIFIED BY '123456'; GRANT ALL ON *.* TO 'canal'@'%' WITH GRANT OPTION; FLUSH PRIVILEGES; -- GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
二、安裝Canal
Linux平臺:
# 解壓方式一 tar -zxvf canal.deployer-1.1.2.tar.gz mkdir canal.deployer-1.1.2 mv bin canal.deployer-1.1.2/ mv logs canal.deployer-1.1.2/ mv lib canal.deployer-1.1.2/ mv conf canal.deployer-1.1.2/ # 解壓方式二 mkdir ~/canal-1.1.2 tar -zxvf canal.deployer-1.1.2.tar.gz -C ~/canal-1.1.2/
Windows平臺:
新建一個canal的目錄,然後開啟目錄
把壓縮包內容解壓到目錄中即可
通用配置操作:
案例只是為了演示,單機執行的方式進行配置
# 備份 instance.properties檔案 cd ~/canal-1.1.2/example cp instance.properties instance.properties.bak
編輯example下的例項檔案
vim ~/canal-1.1.2/conf/example/instance.properties
關鍵引數項:
# 偽裝從庫的id,不要和主庫id一致即可 canal.instance.mysql.slaveId=20 # 主庫IP地址和埠號 canal.instance.master.address=192.168.2.225:3308 # 主庫開設的監聽賬號 canal.instance.dbUsername=canal canal.instance.dbPassword=123456 # 字符集 canal.instance.connectionCharset=UTF-8 # 預設監聽的db? canal.instance.defaultDatabaseName=canal
三、啟動,關閉Canal
restart.sh startup.bat startup.sh stop.sh # windows 平臺直接執行 startup.bat # 關閉就是直接關閉終端視窗即可 startup.bat # linux 平臺 startup.sh # 啟動 restart.sh # 重啟 stop.sh # 停止
四、建立Canal監聽客戶端:
canal沒有提供預設的終端輸出,強制要求客戶端監聽日誌訊息:這裡使用Java做一個客戶端來檢視訊息
建立普通Maven專案,引入兩個依賴
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> </dependencies>
客戶端類:
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException { //TODO 獲取連線 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); while (true) { //TODO 連線 canalConnector.connect(); //TODO 訂閱資料庫 canalConnector.subscribe("canal.*"); //TODO 獲取資料 Message message = canalConnector.get(100); //TODO 獲取Entry集合 List<CanalEntry.Entry> entries = message.getEntries(); //TODO 判斷集合是否為空,如果為空,則等待一會繼續拉取資料 if (entries.size() <= 0) { System.out.println("當次抓取沒有資料,休息一會。。。。。。"); Thread.sleep(1000); } else { //TODO 遍歷entries,單條解析 for (CanalEntry.Entry entry : entries) { //1.獲取表名 String tableName = entry.getHeader().getTableName(); //2.獲取型別 CanalEntry.EntryType entryType = entry.getEntryType(); //3.獲取序列化後的資料 ByteString storeValue = entry.getStoreValue(); //4.判斷當前entryType型別是否為ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { //5.反序列化資料 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //6.獲取當前事件的操作型別 CanalEntry.EventType eventType = rowChange.getEventType(); //7.獲取資料集 List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); //8.遍歷rowDataList,並列印資料集 for (CanalEntry.RowData rowData : rowDataList) { JSONObject beforeData = new JSONObject(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } //資料列印 System.out.println("Table:" + tableName + ",EventType:" + eventType + ",Before:" + beforeData + ",After:" + afterData); } } else { System.out.println("當前操作型別為:" + entryType); } } } } } }
開啟後,想指定的表中寫入資料:
客戶端輸出訊息:
當次抓取沒有資料,休息一會。。。。。。 當次抓取沒有資料,休息一會。。。。。。 當次抓取沒有資料,休息一會。。。。。。 當次抓取沒有資料,休息一會。。。。。。 當前操作型別為:TRANSACTIONBEGIN Table:user_info,EventType:INSERT,Before:{},After:{"sex":"男","name":"張三","id":"1"} 當前操作型別為:TRANSACTIONEND 當前操作型別為:TRANSACTIONBEGIN Table:user_info,EventType:INSERT,Before:{},After:{"sex":"男","name":"張三","id":"2"} 當前操作型別為:TRANSACTIONEND 當次抓取沒有資料,休息一會。。。。。。
五、Kafka模式
修改 canal.properties# 指定輸出模式為kafka canal.serverMode = kafka # kafka叢集地址,如單機,則寫一個即可 canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
修改 example/instance.properties
# mq config # 指定Topic名稱 和 分割槽數量 canal.mq.topic=canal_test canal.mq.partitionsNum=1
重啟canal以載入配置資訊
啟動Kafka消費者來檢視是否執行:
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test
執行插入SQL:
INSERT INTO user_info VALUES('1001','zhangsan','male'),('1002','lisi','female');
Kafka控制檯:
{ "data": [ { "id": "1001", "name": "zhangsan", "sex": "male" }, { "id": "1002", "name": "lisi", "sex": "female" } ], "database": "gmall-2021", "es": 1639360729000, "id": 1, "isDdl": false, "mysqlType": { "id": "varchar(255)", "name": "varchar(255)", "sex": "varchar(255)" }, "old": "null", "sql": "", "sqlType": { "id": 12, "name": 12, "sex": 12 }, "table": "user_info", "ts": 1639361038454, "type": "INSERT" }