1. 程式人生 > 其它 >【Canal】01 入門 & Kafka模式

【Canal】01 入門 & Kafka模式

什麼是Canal (卡耐爾) ?

Canal 是用 Java 開發的基於資料庫增量日誌解析,提供增量資料訂閱&消費的中介軟體 原理基於MySQL的binlog從庫監聽

一、MySQL環境配置

1、更改MySQL配置 (my.ini / my.cnf):

[mysqld]

# 主庫id標識
server-id=1

# 開啟binlog日誌
log-bin=mysql-bin

# 日誌格式型別
binlog_format=row

# (可選)宣告只對哪個庫進行日誌輸出
binlog-do-db=gmall-2021

2、測試用例

沒有表就建立一個測試用的表:

CREATE
TABLE user_info(   `id` VARCHAR(255),   `name` VARCHAR(255),   `sex` VARCHAR(255) );

3、監聽的賬號

和主從複製一樣,需要提供一個從庫監聽的賬號:

CREATE USER 'canal'@'%' IDENTIFIED BY '123456';
GRANT ALL ON *.* TO 'canal'@'%' WITH GRANT OPTION;
FLUSH PRIVILEGES;

-- GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

二、安裝Canal

Linux平臺:

# 解壓方式一
tar -zxvf canal.deployer-1.1.2.tar.gz

mkdir canal.deployer-1.1.2

mv bin canal.deployer-1.1.2/
mv logs canal.deployer-1.1.2/
mv lib canal.deployer-1.1.2/
mv conf canal.deployer-1.1.2/

# 解壓方式二
mkdir ~/canal-1.1.2
tar -zxvf canal.deployer-1.1.2.tar.gz -C ~/canal-1.1.2/

Windows平臺:

新建一個canal的目錄,然後開啟目錄
把壓縮包內容解壓到目錄中即可

通用配置操作:

案例只是為了演示,單機執行的方式進行配置

# 備份 instance.properties檔案
cd ~/canal-1.1.2/example
cp instance.properties instance.properties.bak

編輯example下的例項檔案

vim ~/canal-1.1.2/conf/example/instance.properties

關鍵引數項:

# 偽裝從庫的id,不要和主庫id一致即可
canal.instance.mysql.slaveId=20 
# 主庫IP地址和埠號
canal.instance.master.address=192.168.2.225:3308
# 主庫開設的監聽賬號
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
# 字符集
canal.instance.connectionCharset=UTF-8
# 預設監聽的db? 
canal.instance.defaultDatabaseName=canal

三、啟動,關閉Canal

restart.sh
startup.bat 
startup.sh
stop.sh

# windows 平臺直接執行 startup.bat 
# 關閉就是直接關閉終端視窗即可
startup.bat 

# linux 平臺 
startup.sh  # 啟動
restart.sh # 重啟
stop.sh # 停止

四、建立Canal監聽客戶端:

canal沒有提供預設的終端輸出,強制要求客戶端監聽日誌訊息:

這裡使用Java做一個客戶端來檢視訊息

建立普通Maven專案,引入兩個依賴

<dependencies>
     <dependency>
         <groupId>com.alibaba.otter</groupId>
         <artifactId>canal.client</artifactId>
         <version>1.1.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

客戶端類:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {

    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {

        //TODO 獲取連線
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");

        while (true) {

            //TODO 連線
            canalConnector.connect();

            //TODO 訂閱資料庫
            canalConnector.subscribe("canal.*");

            //TODO 獲取資料
            Message message = canalConnector.get(100);

            //TODO 獲取Entry集合
            List<CanalEntry.Entry> entries = message.getEntries();

            //TODO 判斷集合是否為空,如果為空,則等待一會繼續拉取資料
            if (entries.size() <= 0) {
                System.out.println("當次抓取沒有資料,休息一會。。。。。。");
                Thread.sleep(1000);
            } else {

                //TODO 遍歷entries,單條解析
                for (CanalEntry.Entry entry : entries) {

                    //1.獲取表名
                    String tableName = entry.getHeader().getTableName();

                    //2.獲取型別
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    //3.獲取序列化後的資料
                    ByteString storeValue = entry.getStoreValue();

                    //4.判斷當前entryType型別是否為ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {

                        //5.反序列化資料
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

                        //6.獲取當前事件的操作型別
                        CanalEntry.EventType eventType = rowChange.getEventType();

                        //7.獲取資料集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();

                        //8.遍歷rowDataList,並列印資料集
                        for (CanalEntry.RowData rowData : rowDataList) {

                            JSONObject beforeData = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }

                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }

                            //資料列印
                            System.out.println("Table:" + tableName +
                                    ",EventType:" + eventType +
                                    ",Before:" + beforeData +
                                    ",After:" + afterData);
                        }
                    } else {
                        System.out.println("當前操作型別為:" + entryType);
                    }
                }
            }
        }
    }
}

開啟後,想指定的表中寫入資料:

客戶端輸出訊息:

當次抓取沒有資料,休息一會。。。。。。
當次抓取沒有資料,休息一會。。。。。。
當次抓取沒有資料,休息一會。。。。。。
當次抓取沒有資料,休息一會。。。。。。
當前操作型別為:TRANSACTIONBEGIN
Table:user_info,EventType:INSERT,Before:{},After:{"sex":"","name":"張三","id":"1"}
當前操作型別為:TRANSACTIONEND
當前操作型別為:TRANSACTIONBEGIN
Table:user_info,EventType:INSERT,Before:{},After:{"sex":"","name":"張三","id":"2"}
當前操作型別為:TRANSACTIONEND
當次抓取沒有資料,休息一會。。。。。。

五、Kafka模式

修改 canal.properties
# 指定輸出模式為kafka
canal.serverMode = kafka

# kafka叢集地址,如單機,則寫一個即可
canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

修改 example/instance.properties

# mq config
# 指定Topic名稱 和 分割槽數量
canal.mq.topic=canal_test
canal.mq.partitionsNum=1

重啟canal以載入配置資訊

啟動Kafka消費者來檢視是否執行:

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test

執行插入SQL:

INSERT INTO user_info VALUES('1001','zhangsan','male'),('1002','lisi','female');

Kafka控制檯:

{
  "data": [
    {
      "id": "1001",
      "name": "zhangsan",
      "sex": "male"
    },
    {
      "id": "1002",
      "name": "lisi",
      "sex": "female"
    }
  ],
  "database": "gmall-2021",
  "es": 1639360729000,
  "id": 1,
  "isDdl": false,
  "mysqlType": {
    "id": "varchar(255)",
    "name": "varchar(255)",
    "sex": "varchar(255)"
  },
  "old": "null",
  "sql": "",
  "sqlType": {
    "id": 12,
    "name": 12,
    "sex": 12
  },
  "table": "user_info",
  "ts": 1639361038454,
  "type": "INSERT"
}