1. 程式人生 > 實用技巧 >全文搜尋引擎 Elasticsearch (四)MySQL如何實時同步資料到ES

全文搜尋引擎 Elasticsearch (四)MySQL如何實時同步資料到ES

canal簡介

canal主要用途是對MySQL資料庫增量日誌進行解析,提供增量資料的訂閱和消費,簡單說就是可以對MySQL的增量資料進行實時同步,支援同步到MySQL、Elasticsearch、HBase等資料儲存中去。

canal工作原理

  • canal 模擬 MySQL slave 的互動協議,偽裝自己為 MySQL slave ,向 MySQL master 傳送 dump 協議
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 物件(原始為 byte 流)



canal使用

元件下載

  • canal的各個元件的用途各不相同,下面分別介紹下:

    • canal-server(canal-deploy):可以直接監聽MySQL的binlog,把自己偽裝成MySQL的從庫,只負責接收資料,並不做處理。
    • canal-adapter:相當於canal的客戶端,會從canal-server中獲取資料,然後對資料進行同步,可以同步到MySQL、Elasticsearch和HBase等儲存中去。
    • canal-admin:為canal提供整體配置管理、節點運維等面向運維的功能,提供相對友好的WebUI操作介面,方便更多使用者快速和安全的操作。
  • 由於不同版本的MySQL、Elasticsearch和canal會有相容性問題,所以我們先對其使用版本做個約定。

應用版本
MySQL 3306 5.7
Elasticsearch 9200 7.6.2
Kibanba 5601 7.6.2
canal-server 11111 1.1.15
canal-adapter 8081 1.1.15
canal-admin 8089 1.1.15

MySQL配置

  • 由於canal是通過訂閱MySQL的binlog來實現資料同步的,所以我們需要開啟MySQL的binlog寫入功能,並設定binlog-format為ROW模式,我的配置檔案為/mydata/mysql/conf/my.cnf,改為如下內容即可;
[mysqld]
## 設定server_id,同一區域網中需要唯一
server_id=101 
## 指定不需要同步的資料庫名稱
binlog-ignore-db=mysql  
## 開啟二進位制日誌功能
log-bin=mall-mysql-bin  
## 設定二進位制日誌使用記憶體大小(事務)
binlog_cache_size=1M  
## 設定使用的二進位制日誌格式(mixed,statement,row)
binlog_format=row  
## 二進位制日誌過期清理時間。預設值為0,表示不自動清理。
expire_logs_days=7  
## 跳過主從複製中遇到的所有錯誤或指定型別的錯誤,避免slave端複製中斷。
## 如:1062錯誤是指一些主鍵重複,1032錯誤是因為主從資料庫資料不一致
slave_skip_errors=1062  

  

  • 配置完成後需要重新啟動MySQL,重啟成功後通過如下命令檢視binlog是否啟用;
show variables like '%log_bin%'

  

  • 接下來需要建立一個擁有從庫許可權的賬號,用於訂閱binlog,這裡建立的賬號為canal:canal

  

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

  • 建立好測試用的資料庫canal-test,之後建立一張商品表product,建表語句如下。
CREATE TABLE `product`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `price` decimal(10, 2) NULL DEFAULT NULL,
  `pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
 

canal-server使用

  • 將我們下載好的壓縮包canal.deployer-1.1.5-SNAPSHOT.tar.gz上傳到Linux伺服器,
  • 然後解壓到指定目錄/mydata/canal-server
  • 可使用如下命令解壓 tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz
  • 解壓完成後目錄結構如下;

  

├── bin
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── canal_local.properties
│ ├── canal.properties
│ └── example
│   └── instance.properties
├── lib
├── logs
│ ├── canal
│ │ └── canal.log
│ └── example
│     ├── example.log
│     └── example.log
└── plugin
  • 修改配置檔案conf/example/instance.properties,按如下配置即可,主要是修改資料庫相關配置;
# 需要同步資料的MySQL地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用於同步資料的資料庫賬號
canal.instance.dbUsername=canal
# 用於同步資料的資料庫密碼
canal.instance.dbPassword=canal
# 資料庫連線編碼
canal.instance.connectionCharset = UTF-8
# 需要訂閱binlog的表過濾正則表示式
canal.instance.filter.regex=.*\\..*

  • 使用startup.sh指令碼啟動canal-server服務;
sh bin/startup.sh
  • 如果想要停止canal-server服務可以使用如下命令。
sh bin/stop.sh

canal-adapter使用

  • 將我們下載好的壓縮包canal.adapter-1.1.5-SNAPSHOT.tar.gz上傳到Linux伺服器,然後解壓到指定目錄/mydata/canal-adpter,解壓完成後目錄結構如下;
├── bin
│ ├── adapter.pid
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
│ ├── es6
│ ├── es7
│ │ ├── biz_order.yml
│ │ ├── customer.yml
│ │ └── product.yml
│ ├── hbase
│ ├── kudu
│ ├── logback.xml
│ ├── META-INF
│ │ └── spring.factories
│ └── rdb
├── lib
├── logs
│ └── adapter
│     └── adapter.log
└── plugin
 
  • 修改配置檔案conf/application.yml,按如下配置即可,主要是修改canal-server配置、資料來源配置和客戶端介面卡配置;
canal.conf:
  mode: tcp # 客戶端的模式,可選tcp kafka rocketMQ
  flatMessage: true # 扁平message開關, 是否以json字串形式投遞資料, 僅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 對應叢集模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批數量
  retries: 0 # 重試次數, -1為無限重試
  timeout: # 同步超時時間, 單位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #設定canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources: # 源資料庫配置
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/canal_test?useUnicode=true
      username: canal
      password: canal
  canalAdapters: # 介面卡列表
  - instance: example # canal例項名或者MQ topic名
    groups: # 分組列表
    - groupId: g1 # 分組id, 如果是MQ模式將用到該值
      outerAdapters:
      - name: logger # 日誌列印介面卡
      - name: es7 # ES同步介面卡
        hosts: 127.0.0.1:9200 # ES連線地址
        properties:
          mode: rest # 模式可選transport(9300) 或者 rest(9200)
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch # ES叢集名稱
 
  • 新增配置檔案canal-adapter/conf/es7/product.yml,用於配置MySQL中的表與Elasticsearch中索引的對映關係;
dataSourceKey: defaultDS # 源資料來源的key, 對應上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 對應MQ模式下的groupId, 只會同步對應groupId的資料
esMapping:
  _index: canal_product # es 的索引名稱
  _id: _id  # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
  sql: "SELECT
            p.id AS _id,
            p.title,
            p.sub_title,
            p.price,
            p.pic
        FROM
            product p"        # sql對映
  etlCondition: "where a.c_time>={}"   #etl的條件引數
  commitBatch: 3000   # 提交批大小
 
  • 使用startup.sh指令碼啟動canal-adapter服務;
sh bin/startup.sh
  • 如果需要停止canal-adapter服務可以使用如下命令。
sh bin/stop.sh

資料同步演示

經過上面的一系列步驟,canal的資料同步功能已經基本可以使用了,下面我們來演示下資料同步功能。

  • 首先我們需要在Elasticsearch中建立索引,和MySQL中的product表相對應,直接在Kibana的Dev Tools中使用如下命令建立即可;
PUT canal_product
{
  "mappings": {
    "properties": {
      "title": {
        "type": "text"
      },
      "sub_title": {
        "type": "text"
      },
      "pic": {
        "type": "text"
      },
      "price": {
        "type": "double"
      }
    }
  }
}
 

  • 建立完成後可以檢視下索引的結構;
GET canal_product/_mapping

  • 之後使用如下SQL語句在資料庫中建立一條記錄;
INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 1, 'iphone1', ' 智慧手機 2GB+64GB', 4999.00, NULL );
  • 建立成功後,在Elasticsearch中搜索下,發現數據已經同步了;
GET canal_product/_search

接下來的修改,刪除相同可自行測試。

canal-admin使用

  • 將我們下載好的壓縮包canal.admin-1.1.5-SNAPSHOT.tar.gz上傳到Linux伺服器,然後解壓到指定目錄/mydata/canal-admin,解壓完成後目錄結構如下;
├── bin
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
│ ├── canal_manager.sql
│ ├── canal-template.properties
│ ├── instance-template.properties
│ ├── logback.xml
│ └── public
│     ├── avatar.gif
│     ├── index.html
│     ├── logo.png
│     └── static
├── lib
└── logs
  • 建立canal-admin需要使用的資料庫canal_manager,建立SQL指令碼為/mydata/canal-admin/conf/canal_manager.sql,會建立如下表;

修改配置檔案conf/application.yml,按如下配置即可,主要是修改資料來源配置和canal-admin的管理賬號配置,注意需要用一個有讀寫許可權的資料庫賬號,比如管理賬號root:root

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root
  password: root
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin
 
  • 接下來對之前搭建的canal-serverconf/canal_local.properties檔案進行配置,
  • 主要是修改canal-admin的配置
  • 修改完成後使用sh bin/startup.sh
  • local重啟canal-server
# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = 
 
  • 使用startup.sh指令碼啟動canal-admin服務;
sh bin/startup.sh
  • 訪問canal-admin的Web介面,輸入賬號密碼admin:123456即可登入
  • 訪問地址:http://192.168.3.101:8089

  • 登入成功後即可使用Web介面操作canal-server。

canal官方文件:https://github.com/alibaba/canal/wiki