1. 程式人生 > 實用技巧 >canal-clientadapter 資料同步實驗

canal-clientadapter 資料同步實驗

背景

canal 1.1.1版本之後, 內建增加客戶端資料同步功能, Client介面卡整體介紹: ClientAdapter

RDB介面卡

RDB adapter 用於適配mysql到任意關係型資料庫(需支援jdbc)的資料同步及匯入 測試支援的資料庫列表:

  • MYSQL
  • ORACLE
  • POSTGRESS
  • SQLSERVER
  • ELASTICSEARCH
  • ...

clientadapter就是從mysql資料同步至上述型別資料庫

安裝和配置

$ wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
$ mkdir canal-adapter 
$ tar xf canal.adapter-1.1.4.tar.gz -C canal-adapter
$ ls canal-adapter    #目錄和canal-deploy 一樣
bin  conf  lib  logs  plugin

業務邏輯: src_mysql(192.168.20.40) <-> canal-deploy <-> client-adapter <-> dest_mysql(192.168.20.41)
canal-deploy安裝配置參考之前文章

實驗一:實現mysql -> mysql 指定庫同步(db名相同)

同步src_mysqltexttext1庫。有情提醒:dest_mysql上需要手動建庫create database xxx;

$ cat conf/application.yml

canal.conf:
  canalServerHost: 127.0.0.1:11111          # 對應單機模式下的canal server的ip:port
#  zookeeperHosts: slave1:2181               # 對應叢集模式下的zk地址, 如果配置了canalServerHost, 則以canalServerHost為準
#  mqServers: slave1:6667 #or rocketmq       # kafka或rocketMQ地址, 與canalServerHost不能並存
  flatMessage: true                         # 扁平message開關, 是否以json字串形式投遞資料, 僅在kafka/rocketMQ模式下有效
  batchSize: 50                             # 每次獲取資料的批大小, 單位為K
  syncBatchSize: 1000                       # 每次同步的批數量
  retries: 0                                # 重試次數, -1為無限重試
  timeout:                                  # 同步超時時間, 單位毫秒
  mode: tcp # kafka rocketMQ                # canal client的模式: tcp kafka rocketMQ
#  srcDataSources:                           # 源資料庫
#    defaultDS:                              # 自定義名稱
#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true   # jdbc url 
#      username: root                                            # jdbc 賬號
#      password: 121212                                          # jdbc 密碼
  canalAdapters:                            # 介面卡列表
  - instance: example                       # canal 例項名或者 MQ topic 名
    groups:                                 # 分組列表,一組內配置被訂閱序列執行
    - groupId: g1                           # 分組id, 如果是MQ模式將用到該值
      outerAdapters:                        # 分組內介面卡列表
      - name: logger                        # 日誌列印介面卡
      - name: rdb                         #指定為rdb型別同步
        key: mysql1                       # 指定adapter的唯一key, 與表對映配置中outerAdapterKey對應
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver                          #jdbc驅動名,部分jdbc的jar包需要自行放到lib目錄下
          jdbc.url: jdbc:mysql://192.168.20.41:3306/text?useUnicode=true #jdbc url      text庫
          jdbc.username: root                                                         #jdbc username
          jdbc.password: 123123                                                     #jdbc password
      - name: rdb
        key: mysql2                                                                    #第二個庫text1指定一個唯一key   
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://192.168.20.41:3306/text1?useUnicode=true    #text2庫
          jdbc.username: root
          jdbc.password: 123123
     ...

注意點:

  1. 其中 outAdapter 的配置: name統一為rdb, key為對應的資料來源的唯一標識需和下面的表對映檔案中的outerAdapterKey對應, properties為目標庫jdb的相關引數
  2. adapter將會自動載入 conf/rdb 下的所有.yml結尾的表對映配置檔案

rdb配置檔案

$ cat conf/rdb/mytest_user.yml 
#dataSourceKey: defaultDS        # 源資料來源的key, 對應上面配置的srcDataSources中的值
#destination: example            # cannal的instance或者MQ的topic
#groupId:                        # 對應MQ模式下的groupId, 只會同步對應groupId的資料
#outerAdapterKey: mysql1        # adapter key, 對應上面配置outAdapters中的key
#concurrent: true                # 是否按主鍵hash並行同步, 並行同步的表必須保證主鍵不會更改及主鍵不能為其他同步表的外來鍵!!
#dbMapping:
#  database: text            # 源資料來源的database/shcema
#  table: user                   # 源資料來源表名
#  targetTable: mytest.tb_user   # 目標資料來源的庫名.表名
#  targetPk:                     # 主鍵對映
#    id: id                      # 如果是複合主鍵可以換行對映多個
##  mapAll: true                 # 是否整表對映, 要求源表和目標表欄位名一模一樣 (如果targetColumns也配置了對映,則以targetColumns配置為準)
#  targetColumns:                # 欄位對映, 格式: 目標表欄位: 源表字段, 如果欄位名一樣源表字段名可不填
#    id:
#    name:
#    role_id:
#    c_time:
#    test1: 
#  etlCondition: "where c_time>={}"    #簡單的過濾
#  commitBatch: 3000 # 批量提交的大小
    
# Mirror schema synchronize config      #schema
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  mirrorDb: true  #映象複製,DDL,DML
  database: text  #即兩庫的schema要一模一樣


$ cp -a conf/rdb/mytest_user.yml conf/rdb/mysql2.yml
$ cat conf/rdb/mysql2.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql2   #key 對應上
concurrent: true
dbMapping:
  mirrorDb: true
  database: text1     #text1庫

啟動canal-deploy
啟動client-adapter

$ cd bin && ./startup.sh

# 192.168.20.40 新建表,新增資料
$ mysql -uroot -h192.168.20.40 -p
mysql > use text;
mysql > create table `labixiaoxin` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` char(30) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
mysql> insert into labixiaoxin(name) values ('小新');

檢視canal-adapter日誌

$ less logs/adapter/adapter.log 
2020-07-29 20:00:37.650 [pool-14-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"text","destination":"example","es":1596024037000,"groupId":null,"isDdl":true,"old":null,"pkNames":null,"sql":"create table `labixiaoxin` (\n`id` int(10) unsigned NOT NULL AUTO_INCREMENT,\n`name` char(30) NOT NULL,\nPRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8","table":"labixiaoxin","ts":1596024037650,"type":"CREATE"}     #DDL 建表

2020-07-29 20:00:50.469 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":1,"name":"小新"},"database":"text","destination":"example","old":null,"table":"labixiaoxin","type":"INSERT"}  #DML 插入資料

dest_mysql(192.168.20.41)檢視資料是否同步

$ mysql -uroot -h192.168.20.41 -p
mysql > use text;
mysql > show tables;
+----------------------------+
| Tables_in_text             |
+----------------------------+
| labixiaoxin                |
+----------------------------+
mysql> select * from labixiaoxin;
+----+--------+
| id | name   |
+----+--------+
|  1 | 小新   |                    
+----+--------+

資料已同步

實驗二:實現mysql -> mysql 指定表同步(db name,table name可不相同)

$ cat conf/rdb/mytest_user.yml 
...
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://192.168.20.41:3306/?useUnicode=true   #不指定資料庫
          jdbc.username: root
          jdbc.password: 123123

$ cat conf/rdb/mytest_user.yml  #實驗一有引數解釋,多個表即配多個yml檔案
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  database: text
  table: labixiaoxin
  targetTable: xiaoxin.roles     #庫和表需要提前建好  源text.labixiaoxin -> 目標xiaoxin.roles 
  targetPk:
    id: id
#  mapAll: true
  targetColumns:
    id: 
    names: name
    

實驗三:實現mysql -> mysql 指定資料變更後同步

無法實現,需要程式碼開發

實驗四:實現mysql -> elasticsearch

這裡簡單測試單表對映索引示例:

mysql > create table `labixiaoxin` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` char(30) NOT NULL,
`age` int(3) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

mysql > insert into labixiaoxin(name,age) values ('小新',5);

配置新增elasticsearch中索引mapping

PUT /labixiaoxin    #索引名
{
  "mappings": {
    "_doc": {
    "properties": {
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "age": {
        "type": "long"
      }
    }
    }
  }
}

啟動canal-deploy
配置client-adapter

$ cat conf/application.yml
canal.conf:
...
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.20.40:3306/text?useUnicode=true   #es一次只能指定一個庫,默認同步多個庫需要寫多個srcDataSources
      username: root
      password: 123123
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
       - name: es
        hosts: 192.168.20.4:9200,192.168.20.5:9200,192.168.20.6:9200 # 127.0.0.1:9200 for rest mode  叢集地址, 逗號分隔
        properties:
          mode: rest #transport # or rest   可指定transport模式或者rest模式
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: myes            #叢集名稱

$ cat conf/es/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: labixiaoxin            #es 索引名
  _type: _doc                     #es 的type名稱, es7下無需配置此項
  _id: _id                          #es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
  upsert: true
#  pk: id                         #如果不需要_id, 則需要指定一個屬性為主鍵屬性
  sql: "select a.id as _id,a.name,a.age from labixiaoxin a"    #sql對映,這裡 from labixiaoxin = from text.labixiaoxin
#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>={}"
  commitBatch: 3000

啟動client-adapter
檢視日誌

$ less logs/adapter/adapter.log 
...
2020-07-30 15:23:23.611 [pool-3-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":1,"name":"小新","age":5}],"database":"text1","destination":"example","es":1596093803000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"labixiaoxin","ts":1596093803610,"type":"INSERT"} 

檢視elasticsearch 資料是插入索引

GET /labixiaoxin/_search
{
  "query": {"match_all": {}}
}

  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "labixiaoxin",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "name" : "小新",
          "age" : 5
        }
        ...

實驗完成,更新配置資訊,請參考官方文件

同步至es選型: 圖片來源