1. 程式人生 > >基於Canal和Kafka實現MySQL的Binlog近實時同步

基於Canal和Kafka實現MySQL的Binlog近實時同步

## 前提 近段時間,業務系統架構基本完備,資料層面的建設比較薄弱,因為筆者目前工作重心在於搭建一個小型的資料平臺。優先順序比較高的一個任務就是需要近實時同步業務系統的資料(包括儲存、更新或者軟刪除)到一個另一個數據源,持久化之前需要清洗資料並且構建一個相對合理的便於後續業務資料統計、標籤系統構建等擴充套件功能的資料模型。基於當前團隊的資源和能力,優先調研了`Alibaba`開源中介軟體`Canal`的使用。 ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004326014-2094794575.png) 這篇文章簡單介紹一下如何快速地搭建一套`Canal`相關的元件。 ## 關於Canal ### 簡介 下面的簡介和下一節的原理均來自於[Canal](https://github.com/alibaba/canal)專案的`README`: ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004331754-32441425.png) `Canal[kə'næl]`,譯意為水道/管道/溝渠,主要用途是基於`MySQL`資料庫增量日誌解析,提供增量資料訂閱和消費。早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務`trigger`獲取增量變更。從 2010 年開始,業務逐步嘗試資料庫日誌解析獲取增量變更進行同步,由此衍生出了大量的資料庫增量訂閱和消費業務。 基於日誌增量訂閱和消費的業務包括: - 資料庫映象 - 資料庫實時備份 - 索引構建和實時維護(拆分異構索引、倒排索引等) - 業務`Cache`重新整理 - 帶業務邏輯的增量資料處理 ### Canal的工作原理 `MySQL`主備複製原理: ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004339159-1063626828.png) - `MySQL`的`Master`例項將資料變更寫入二進位制日誌(`binary log`,其中記錄叫做二進位制日誌事件`binary log events`,可以通過`show binlog events`進行檢視) - `MySQL`的`Slave`例項將`master`的`binary log events`拷貝到它的中繼日誌(`relay log`) - `MySQL`的`Slave`例項重放`relay log`中的事件,將資料變更反映它到自身的資料 `Canal`的工作原理如下: - `Canal`模擬`MySQL Slave`的互動協議,偽裝自己為`MySQL Slave`,向`MySQL Master`傳送`dump`協議 - `MySQL Master`收到`dump`請求,開始推送`binary log`給`Slave`(即`Canal`) - `Canal`解析`binary log`物件(原始為`byte`流),並且可以通過聯結器傳送到對應的訊息佇列等中介軟體中 ### 關於Canal的版本和部件 截止筆者開始編寫本文的時候(`2020-03-05`),`Canal`的最新發布版本是`v1.1.5-alpha-1`(`2019-10-09`釋出的),最新的正式版是`v1.1.4`(`2019-09-02`釋出的)。其中,`v1.1.4`主要添加了鑑權、監控的功能,並且做了一些列的效能優化,此版本整合的聯結器是`Tcp`、`Kafka`和`RockerMQ`。而`v1.1.5-alpha-1`版本已經新增了`RabbitMQ`聯結器,但是此版本的`RabbitMQ`聯結器暫時不能定義連線`RabbitMQ`的埠號,不過此問題已經在`master`分支中修復(具體可以參看原始碼中的`CanalRabbitMQProducer`類的提交記錄)。換言之,`v1.1.4`版本中目前能使用的內建聯結器只有`Tcp`、`Kafka`和`RockerMQ`三種,如果想嚐鮮使用`RabbitMQ`聯結器,可以選用下面的兩種方式之一: - 選用`v1.1.5-alpha-1`版本,但是無法修改`RabbitMQ`的`port`屬性,預設為`5672`。 - 基於`master`分支自行構建`Canal`。 目前,`Canal`專案的活躍度比較高,但是考慮到功能的穩定性問題,筆者建議選用穩定版本在生產環境中實施,當前可以選用`v1.1.4`版本,**本文的例子用選用的就是`v1.1.4`版本,配合`Kafka`聯結器使用**。`Canal`主要包括三個核心部件: - `canal-admin`:後臺管理模組,提供面向`WebUI`的`Canal`管理能力。 - `canal-adapter`:介面卡,增加客戶端資料落地的適配及啟動功能,包括`REST`、日誌介面卡、關係型資料庫的資料同步(表對錶同步)、`HBase`資料同步、`ES`資料同步等等。 - `canal-deployer`:釋出器,核心功能所在,包括`binlog`解析、轉換和傳送報文到聯結器中等等功能都由此模組提供。 一般情況下,`canal-deployer`部件是必須的,其他兩個部件按需選用即可。 ## 部署所需的中介軟體 搭建一套可以用的元件需要部署`MySQL`、`Zookeeper`、`Kafka`和`Canal`四個中介軟體的例項,下面簡單分析一下部署過程。選用的虛擬機器系統是`CentOS7`。 ### 安裝MySQL 為了簡單起見,選用`yum`源安裝(官方連結是`https://dev.mysql.com/downloads/repo/yum`): ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004350233-1474649916.png) ::: info mysql80-community-release-el7-3雖然包名帶了mysql80關鍵字,其實已經集成了MySQL主流版本5.6、5.7和8.x等等的最新安裝包倉庫 ::: 選用的是最新版的`MySQL8.x`社群版,下載`CentOS7`適用的`rpm包`: ```shell cd /data/mysql wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm // 下載完畢之後 sudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm ``` 此時列舉一下`yum`倉庫裡面的`MySQL`相關的包: ```shell [root@localhost mysql]# yum repolist all | grep mysql mysql-cluster-7.5-community/x86_64 MySQL Cluster 7.5 Community disabled mysql-cluster-7.5-community-source MySQL Cluster 7.5 Community - disabled mysql-cluster-7.6-community/x86_64 MySQL Cluster 7.6 Community disabled mysql-cluster-7.6-community-source MySQL Cluster 7.6 Community - disabled mysql-cluster-8.0-community/x86_64 MySQL Cluster 8.0 Community disabled mysql-cluster-8.0-community-source MySQL Cluster 8.0 Community - disabled mysql-connectors-community/x86_64 MySQL Connectors Community enabled: 141 mysql-connectors-community-source MySQL Connectors Community - disabled mysql-tools-community/x86_64 MySQL Tools Community enabled: 105 mysql-tools-community-source MySQL Tools Community - Sourc disabled mysql-tools-preview/x86_64 MySQL Tools Preview disabled mysql-tools-preview-source MySQL Tools Preview - Source disabled mysql55-community/x86_64 MySQL 5.5 Community Server disabled mysql55-community-source MySQL 5.5 Community Server - disabled mysql56-community/x86_64 MySQL 5.6 Community Server disabled mysql56-community-source MySQL 5.6 Community Server - disabled mysql57-community/x86_64 MySQL 5.7 Community Server disabled mysql57-community-source MySQL 5.7 Community Server - disabled mysql80-community/x86_64 MySQL 8.0 Community Server enabled: 161 mysql80-community-source MySQL 8.0 Community Server - disabled ``` 編輯`/etc/yum.repos.d/mysql-community.repo`檔案(`[mysql80-community]`塊中`enabled設定為1`,其實預設就是這樣子,不用改,如果要選用`5.x`版本則需要修改對應的塊): ```shell [mysql80-community] name=MySQL 8.0 Community Server baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/ enabled=1 gpgcheck=1 gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql ``` 然後安裝`MySQL`服務: ```shell sudo yum install mysql-community-server ``` 這個過程比較漫長,因為需要下載和安裝5個`rpm`安裝包(或者是所有安裝包組合的壓縮包`mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar`)。如果網路比較差,也可以直接從官網手動下載後安裝: ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004358763-1901008131.png) ```shell // 下載下面5個rpm包 common --> libs --> libs-compat --> client --> server mysql-community-common mysql-community-libs mysql-community-libs-compat mysql-community-client mysql-community-server // 強制安裝 rpm -ivh mysql-community-common-8.0.18-1.el7.x86_64.rpm --force --nodeps rpm -ivh mysql-community-libs-8.0.18-1.el7.x86_64.rpm --force --nodeps rpm -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64.rpm --force --nodeps rpm -ivh mysql-community-client-8.0.18-1.el7.x86_64.rpm --force --nodeps rpm -ivh mysql-community-server-8.0.18-1.el7.x86_64.rpm --force --nodeps ``` 安裝完畢之後,啟動`MySQL`服務,然後搜尋`MySQL`服務的`root`賬號的臨時密碼用於首次登陸(`mysql -u root -p`): ```shell // 啟動服務,關閉服務就是service mysqld stop service mysqld start // 檢視臨時密碼 cat /var/log/mysqld.log [root@localhost log]# cat /var/log/mysqld.log 2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780 2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li 2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834 // 登入臨時root使用者,使用臨時密碼 [root@localhost log]# mysql -u root -p ``` 接下來做下面的操作: - 修改`root`使用者的密碼:`ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12!@';`(注意密碼規則必須包含大小寫字母、數字和特殊字元) - 更新`root`的`host`,切換資料庫`use mysql;`,指定`host`為`%`以便可以讓其他伺服器遠端訪問`UPDATE USER SET HOST = '%' WHERE USER = 'root';` - 賦予`'root'@'%'`使用者,所有許可權,執行`GRANT ALL PRIVILEGES ON *.* TO 'root'@'%';` - 改變`root'@'%`使用者的密碼校驗規則以便可以使用`Navicat`等工具訪問:`ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';` ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004404405-1318949422.png) 操作完成之後,就可以使用`root`使用者遠端訪問此虛擬機器上的`MySQL`服務。最後確認是否開啟了`binlog`(注意一點是`MySQL8.x`預設開啟`binlog`)`SHOW VARIABLES LIKE '%bin%';`: ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004407681-279772349.png) 最後在`MySQL`的`Shell`執行下面的命令,新建一個使用者名稱`canal`密碼為`QWqw12!@`的新使用者,賦予`REPLICATION SLAVE`和 `REPLICATION CLIENT`許可權: ```shell CREATE USER canal IDENTIFIED BY 'QWqw12!@'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES; ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@'; ``` 切換回去`root`使用者,建立一個數據庫`test`: ```sql CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`; ``` ### 安裝Zookeeper `Canal`和`Kafka`叢集都依賴於`Zookeeper`做服務協調,為了方便管理,一般會獨立部署`Zookeeper`服務或者`Zookeeper`叢集。筆者這裡選用`2020-03-04`釋出的`3.6.0`版本: ```shell midkr /data/zk # 建立資料目錄 midkr /data/zk/data cd /data/zk wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz cd apache-zookeeper-3.6.0-bin/conf cp zoo_sample.cfg zoo.cfg && vim zoo.cfg ``` 把`zoo.cfg`檔案中的`dataDir`設定為`/data/zk/data`,然後啟動`Zookeeper`: ```shell [root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start /usr/bin/java ZooKeeper JMX enabled by default Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED ``` 這裡注意一點,要啟動此版本的`Zookeeper`服務必須本地安裝好`JDK8+`,這一點需要自行處理。啟動的預設埠是`2181`,啟動成功後的日誌如下: ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004415862-1789602852.png) ### 安裝Kafka `Kafka`是一個高效能分散式訊息佇列中介軟體,它的部署依賴於`Zookeeper`。筆者在此選用`2.4.0`並且`Scala`版本為`2.13`的安裝包: ```shell mkdir /data/kafka mkdir /data/kafka/data wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz tar -zxvf kafka_2.13-2.4.0.tgz ``` 由於解壓後`/data/kafka/kafka_2.13-2.4.0/config/server.properties`配置中對應的`zookeeper.connect=localhost:2181`已經符合需要,不必修改,需要修改日誌檔案的目錄`log.dirs`為`/data/kafka/data`。然後啟動`Kafka`服務: ```shell sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties ``` ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004421196-675277977.png) 這樣啟動一旦退出控制檯就會結束`Kafka`程序,可以新增`-daemon`引數用於控制`Kafka`程序後臺不掛斷執行。 ```shell sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties ``` ### 安裝和使用Canal 終於到了主角登場,這裡選用`Canal`的`v1.1.4`穩定釋出版,只需要下載`deployer`模組: ```shell mkdir /data/canal cd /data/canal # 這裡注意一點,Github在國內被牆,下載速度極慢,可以先用其他下載工具下載完再上傳到伺服器中 wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz tar -zxvf canal.deployer-1.1.4.tar.gz ``` 解壓後的目錄如下: ```shell - bin # 運維指令碼 - conf # 配置檔案 canal_local.properties # canal本地配置,一般不需要動 canal.properties # canal服務配置 logback.xml # logback日誌配置 metrics # 度量統計配置 spring # spring-例項配置,主要和binlog位置計算、一些策略配置相關,可以在canal.properties選用其中的任意一個配置檔案 example # 例項配置資料夾,一般認為單個數據庫對應一個獨立的例項配置資料夾 instance.properties # 例項配置,一般指單個數據庫的配置 - lib # 服務依賴包 - logs # 日誌檔案輸出目錄 ``` 在開發和測試環境建議把`logback.xml`的日誌級別修改為`DEBUG`方便定位問題。這裡需要關注`canal.properties`和`instance.properties`兩個配置檔案。`canal.properties`檔案中,需要修改: - 去掉`canal.instance.parser.parallelThreadSize = 16`這個配置項的**註釋**,也就是啟用此配置項,和例項解析器的執行緒數相關,不配置會表現為阻塞或者不進行解析。 - `canal.serverMode`配置項指定為`kafka`,可選值有`tcp`、`kafka`和`rocketmq`(`master`分支或者最新的的`v1.1.5-alpha-1`版本,可以選用`rabbitmq`),預設是`kafka`。 - `canal.mq.servers`配置需要指定為`Kafka`服務或者叢集`Broker`的地址,這裡配置為`127.0.0.1:9092`。 > canal.mq.servers在不同的canal.serverMode有不同的意義。 > kafka模式下,指Kafka服務或者叢集Broker的地址,也就是bootstrap.servers > rocketmq模式下,指NameServer列表 > rabbitmq模式下,指RabbitMQ服務的Host和Port 其他配置項可以參考下面兩個官方`Wiki`的連結: - [Canal-Kafka-RocketMQ-QuickStart](https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart) - [AdminGuide](https://github.com/alibaba/canal/wiki/AdminGuide) `instance.properties`一般指一個數據庫例項的配置,`Canal`架構支援一個`Canal`服務例項,處理多個數據庫例項的`binlog`非同步解析。`instance.properties`需要修改的配置項主要包括: - `canal.instance.mysql.slaveId`需要配置一個和`Master`節點的服務`ID`完全不同的值,這裡筆者配置為`654321`。 - 配置資料來源例項,包括地址、使用者、密碼和目標資料庫: - `canal.instance.master.address`,這裡指定為`127.0.0.1:3306`。 - `canal.instance.dbUsername`,這裡指定為`canal`。 - `canal.instance.dbPassword`,這裡指定為`QWqw12!@`。 - 新增`canal.instance.defaultDatabaseName`,這裡指定為`test`(需要在`MySQL`中建立一個`test`資料庫,見前面的流程)。 - `Kafka`相關配置,這裡暫時使用靜態`topic`和單個`partition`: - `canal.mq.topic`,這裡指定為`test`,**也就是解析完的`binlog`結構化資料會發送到`Kafka`的命名為`test`的`topic`中**。 - `canal.mq.partition`,這裡指定為`0`。 配置工作做好之後,可以啟動`Canal`服務: ```shell sh /data/canal/bin/startup.sh # 檢視服務日誌 tail -100f /data/canal/logs/canal/canal # 檢視例項日誌 -- 一般情況下,關注例項日誌即可 tail -100f /data/canal/logs/example/example.log ``` 啟動正常後,見例項日誌如下: ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004428986-1621806506.png) 在`test`資料庫建立一個訂單表,並且執行幾個簡單的`DML`: ```sql use `test`; CREATE TABLE `order` ( id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵', order_id VARCHAR(64) NOT NULL COMMENT '訂單ID', amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間', UNIQUE uniq_order_id (`order_id`) ) COMMENT '訂單表'; INSERT INTO `order`(order_id, amount) VALUES ('10086', 999); UPDATE `order` SET amount = 10087 WHERE order_id = '10086'; DELETE FROM `order` WHERE order_id = '10086'; ``` 這個時候,可以利用`Kafka`的`kafka-console-consumer`或者`Kafka Tools`檢視`test`這個`topic`的資料: ```shell sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test ``` ![](https://img2020.cnblogs.com/blog/1412331/202003/1412331-20200313004434294-1545715265.png) 具體的資料如下: ```shell // test資料庫建庫指令碼 {"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"} // order表建表DDL {"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`\n(\n id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',\n order_id VARCHAR(64) NOT NULL COMMENT '訂單ID',\n amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',\n create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',\n UNIQUE uniq_order_id (`order_id`)\n) COMMENT '訂單表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"} // INSERT {"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"} // UPDATE {"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"} // DELETE {"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"} ``` 可見`Kafka`的名為`test`的`topic`已經寫入了對應的結構化`binlog`事件資料,可以編寫消費者監聽`Kafka`對應的`topic`然後對獲取到的資料進行後續處理。 ## 小結 這篇文章大部分篇幅用於介紹其他中介軟體是怎麼部署的,這個問題側面說明了`Canal`本身部署並不複雜,它的配置檔案屬性項比較多,但是實際上需要自定義和改動的配置項是比較少的,也就是說明了它的運維成本和學習成本並不高。後面會分析基於結構化`binlog`事件做`ELT`和持久化相關工作以及`Canal`的生產環境可用級別`HA`叢集的搭建。 參考資料: - [A Quick Guide to Using the MySQL Yum Repository](https://dev.mysql.com/doc/mysql-yum-repo-quick-guide/en/) - [Canal](https://github.com/alibaba/canal) ## 個人部落格 - [Throwable's Blog](http://www.throwable.club/2020/03/07/canal-kafka-mysql-binlog-sync-guide) (本文完 c-3-d e-a-202