canal-clientadapter 資料同步實驗
阿新 • • 發佈:2020-07-30
背景
canal 1.1.1版本之後, 內建增加客戶端資料同步功能, Client介面卡整體介紹: ClientAdapter
RDB介面卡
RDB adapter 用於適配mysql到任意關係型資料庫(需支援jdbc)的資料同步及匯入 測試支援的資料庫列表:
- MYSQL
- ORACLE
- POSTGRESS
- SQLSERVER
- ELASTICSEARCH
- ...
clientadapter
就是從mysql
資料同步至上述型別資料庫
安裝和配置
$ wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz $ mkdir canal-adapter $ tar xf canal.adapter-1.1.4.tar.gz -C canal-adapter $ ls canal-adapter #目錄和canal-deploy 一樣 bin conf lib logs plugin
業務邏輯: src_mysql(192.168.20.40) <-> canal-deploy <-> client-adapter <-> dest_mysql(192.168.20.41)
canal-deploy安裝配置參考之前文章
實驗一:實現mysql -> mysql 指定庫同步(db名相同)
同步src_mysql
的text
和text1
庫。有情提醒:dest_mysql
上需要手動建庫create database xxx;
$ cat conf/application.yml canal.conf: canalServerHost: 127.0.0.1:11111 # 對應單機模式下的canal server的ip:port # zookeeperHosts: slave1:2181 # 對應叢集模式下的zk地址, 如果配置了canalServerHost, 則以canalServerHost為準 # mqServers: slave1:6667 #or rocketmq # kafka或rocketMQ地址, 與canalServerHost不能並存 flatMessage: true # 扁平message開關, 是否以json字串形式投遞資料, 僅在kafka/rocketMQ模式下有效 batchSize: 50 # 每次獲取資料的批大小, 單位為K syncBatchSize: 1000 # 每次同步的批數量 retries: 0 # 重試次數, -1為無限重試 timeout: # 同步超時時間, 單位毫秒 mode: tcp # kafka rocketMQ # canal client的模式: tcp kafka rocketMQ # srcDataSources: # 源資料庫 # defaultDS: # 自定義名稱 # url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # jdbc url # username: root # jdbc 賬號 # password: 121212 # jdbc 密碼 canalAdapters: # 介面卡列表 - instance: example # canal 例項名或者 MQ topic 名 groups: # 分組列表,一組內配置被訂閱序列執行 - groupId: g1 # 分組id, 如果是MQ模式將用到該值 outerAdapters: # 分組內介面卡列表 - name: logger # 日誌列印介面卡 - name: rdb #指定為rdb型別同步 key: mysql1 # 指定adapter的唯一key, 與表對映配置中outerAdapterKey對應 properties: jdbc.driverClassName: com.mysql.jdbc.Driver #jdbc驅動名,部分jdbc的jar包需要自行放到lib目錄下 jdbc.url: jdbc:mysql://192.168.20.41:3306/text?useUnicode=true #jdbc url text庫 jdbc.username: root #jdbc username jdbc.password: 123123 #jdbc password - name: rdb key: mysql2 #第二個庫text1指定一個唯一key properties: jdbc.driverClassName: com.mysql.jdbc.Driver jdbc.url: jdbc:mysql://192.168.20.41:3306/text1?useUnicode=true #text2庫 jdbc.username: root jdbc.password: 123123 ...
注意點:
- 其中 outAdapter 的配置: name統一為rdb, key為對應的資料來源的唯一標識需和下面的表對映檔案中的outerAdapterKey對應, properties為目標庫jdb的相關引數
- adapter將會自動載入 conf/rdb 下的所有.yml結尾的表對映配置檔案
rdb配置檔案
$ cat conf/rdb/mytest_user.yml #dataSourceKey: defaultDS # 源資料來源的key, 對應上面配置的srcDataSources中的值 #destination: example # cannal的instance或者MQ的topic #groupId: # 對應MQ模式下的groupId, 只會同步對應groupId的資料 #outerAdapterKey: mysql1 # adapter key, 對應上面配置outAdapters中的key #concurrent: true # 是否按主鍵hash並行同步, 並行同步的表必須保證主鍵不會更改及主鍵不能為其他同步表的外來鍵!! #dbMapping: # database: text # 源資料來源的database/shcema # table: user # 源資料來源表名 # targetTable: mytest.tb_user # 目標資料來源的庫名.表名 # targetPk: # 主鍵對映 # id: id # 如果是複合主鍵可以換行對映多個 ## mapAll: true # 是否整表對映, 要求源表和目標表欄位名一模一樣 (如果targetColumns也配置了對映,則以targetColumns配置為準) # targetColumns: # 欄位對映, 格式: 目標表欄位: 源表字段, 如果欄位名一樣源表字段名可不填 # id: # name: # role_id: # c_time: # test1: # etlCondition: "where c_time>={}" #簡單的過濾 # commitBatch: 3000 # 批量提交的大小 # Mirror schema synchronize config #schema dataSourceKey: defaultDS destination: example groupId: g1 outerAdapterKey: mysql1 concurrent: true dbMapping: mirrorDb: true #映象複製,DDL,DML database: text #即兩庫的schema要一模一樣 $ cp -a conf/rdb/mytest_user.yml conf/rdb/mysql2.yml $ cat conf/rdb/mysql2.yml dataSourceKey: defaultDS destination: example groupId: g1 outerAdapterKey: mysql2 #key 對應上 concurrent: true dbMapping: mirrorDb: true database: text1 #text1庫
啟動canal-deploy
啟動client-adapter
$ cd bin && ./startup.sh
# 192.168.20.40 新建表,新增資料
$ mysql -uroot -h192.168.20.40 -p
mysql > use text;
mysql > create table `labixiaoxin` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` char(30) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
mysql> insert into labixiaoxin(name) values ('小新');
檢視canal-adapter日誌
$ less logs/adapter/adapter.log
2020-07-29 20:00:37.650 [pool-14-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"text","destination":"example","es":1596024037000,"groupId":null,"isDdl":true,"old":null,"pkNames":null,"sql":"create table `labixiaoxin` (\n`id` int(10) unsigned NOT NULL AUTO_INCREMENT,\n`name` char(30) NOT NULL,\nPRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8","table":"labixiaoxin","ts":1596024037650,"type":"CREATE"} #DDL 建表
2020-07-29 20:00:50.469 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":1,"name":"小新"},"database":"text","destination":"example","old":null,"table":"labixiaoxin","type":"INSERT"} #DML 插入資料
dest_mysql(192.168.20.41)
檢視資料是否同步
$ mysql -uroot -h192.168.20.41 -p
mysql > use text;
mysql > show tables;
+----------------------------+
| Tables_in_text |
+----------------------------+
| labixiaoxin |
+----------------------------+
mysql> select * from labixiaoxin;
+----+--------+
| id | name |
+----+--------+
| 1 | 小新 |
+----+--------+
資料已同步
實驗二:實現mysql -> mysql 指定表同步(db name,table name可不相同)
$ cat conf/rdb/mytest_user.yml
...
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.20.41:3306/?useUnicode=true #不指定資料庫
jdbc.username: root
jdbc.password: 123123
$ cat conf/rdb/mytest_user.yml #實驗一有引數解釋,多個表即配多個yml檔案
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: text
table: labixiaoxin
targetTable: xiaoxin.roles #庫和表需要提前建好 源text.labixiaoxin -> 目標xiaoxin.roles
targetPk:
id: id
# mapAll: true
targetColumns:
id:
names: name
實驗三:實現mysql -> mysql 指定資料變更後同步
無法實現,需要程式碼開發
實驗四:實現mysql -> elasticsearch
這裡簡單測試單表對映索引示例:
mysql > create table `labixiaoxin` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` char(30) NOT NULL,
`age` int(3) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
mysql > insert into labixiaoxin(name,age) values ('小新',5);
配置新增elasticsearch
中索引mapping
PUT /labixiaoxin #索引名
{
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"age": {
"type": "long"
}
}
}
}
}
啟動canal-deploy
配置client-adapter
$ cat conf/application.yml
canal.conf:
...
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.20.40:3306/text?useUnicode=true #es一次只能指定一個庫,默認同步多個庫需要寫多個srcDataSources
username: root
password: 123123
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: es
hosts: 192.168.20.4:9200,192.168.20.5:9200,192.168.20.6:9200 # 127.0.0.1:9200 for rest mode 叢集地址, 逗號分隔
properties:
mode: rest #transport # or rest 可指定transport模式或者rest模式
# security.auth: test:123456 # only used for rest mode
cluster.name: myes #叢集名稱
$ cat conf/es/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: labixiaoxin #es 索引名
_type: _doc #es 的type名稱, es7下無需配置此項
_id: _id #es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
upsert: true
# pk: id #如果不需要_id, 則需要指定一個屬性為主鍵屬性
sql: "select a.id as _id,a.name,a.age from labixiaoxin a" #sql對映,這裡 from labixiaoxin = from text.labixiaoxin
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>={}"
commitBatch: 3000
啟動client-adapter
檢視日誌
$ less logs/adapter/adapter.log
...
2020-07-30 15:23:23.611 [pool-3-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":1,"name":"小新","age":5}],"database":"text1","destination":"example","es":1596093803000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"labixiaoxin","ts":1596093803610,"type":"INSERT"}
檢視elasticsearch
資料是插入索引
GET /labixiaoxin/_search
{
"query": {"match_all": {}}
}
"hits" : {
"total" : 1,
"max_score" : 1.0,
"hits" : [
{
"_index" : "labixiaoxin",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"name" : "小新",
"age" : 5
}
...
實驗完成,更新配置資訊,請參考官方文件
同步至es選型: 圖片來源