canal實時抽取mysql資料傳送到kafka
阿新 • • 發佈:2018-12-13
基本說明
canal 1.1.1版本之後, 預設支援將canal server接收到的binlog資料直接投遞到MQ, 目前預設支援的MQ系統有:
- kafka: https://github.com/apache/kafka
- RocketMQ : https://github.com/apache/rocketmq
環境版本
- 作業系統:CentOS release 6.6 (Final)
- java版本: jdk1.8
- canal 版本: 請下載最新的安裝包,本文以當前v1.1.1 的canal.deployer-1.1.1.tar.gz為例
- MySQL版本 :5.7.18
- 注意 : 關閉所有機器的防火牆,同時注意啟動可以相互telnet ip 埠
一、 安裝zookeeper
二、安裝MQ
- Kafka安裝參考:Kafka QuickStart
- RocketMQ安裝參考:RocketMQ QuickStart
三、 安裝canal.server
3.1 下載壓縮包
到官網地址(release)下載最新壓縮包,請下載 canal.deployer-latest
.tar.gz
3.2 將canal.deployer 複製到固定目錄並解壓
mkdir -p /usr/local/canal
cp canal.deployer-1.1.1.tar.gz /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz
3.3 配置修改引數
a. 修改instance 配置檔案 vi conf/example/instance.properties
# 按需修改成自己的資料庫資訊 ################################################# ... canal.instance.master.address=192.168.1.20:3306 # username/password,資料庫的使用者名稱和密碼 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # mq config canal.mq.topic=example canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #庫名.表名: 唯一主鍵,多個表之間用逗號分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################
對應ip 地址的MySQL 資料庫需進行相關初始化與設定, 可參考 Canal QuickStart
b. 修改canal 配置檔案vi /usr/local/canal/conf/canal.properties
# ...
# 可選項: tcp(預設), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 叢集配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 預設50K, 由於kafka最大訊息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get資料的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為flat json格式物件
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
mq相關引數說明
引數名 | 引數說明 | 預設值 |
---|---|---|
canal.mq.servers | kafka為bootstrap.servers rocketMQ中為nameserver列表 |
127.0.0.1:6667 |
canal.mq.retries | 傳送失敗重試次數 | 0 |
canal.mq.batchSize | kafka為ProducerConfig.BATCH_SIZE_CONFIG rocketMQ無意義 |
16384 |
canal.mq.maxRequestSize | kafka為ProducerConfig.MAX_REQUEST_SIZE_CONFIG rocketMQ無意義 |
1048576 |
canal.mq.lingerMs | kafka為ProducerConfig.LINGER_MS_CONFIG rocketMQ無意義 |
1 |
canal.mq.bufferMemory | kafka為ProducerConfig.BUFFER_MEMORY_CONFIG rocketMQ無意義 |
33554432 |
canal.mq.producerGroup | kafka無意義 rocketMQ為ProducerGroup名 |
Canal-Producer |
canal.mq.canalBatchSize | 獲取canal資料的批次大小 | 50 |
canal.mq.canalGetTimeout | 獲取canal資料的超時時間 | 100 |
canal.mq.flatMessage | 是否為json格式 如果設定為false,對應MQ收到的訊息為protobuf格式 需要通過CanalMessageDeserializer進行解碼 |
true |
--- | --- | --- |
canal.mq.topic | mq裡的topic名 | 無 |
canal.mq.partition | 單佇列模式的分割槽下標, | 1 |
canal.mq.partitionsNum | 雜湊模式的分割槽數 | 無 |
canal.mq.partitionHash | 雜湊規則定義 庫名.表名 : 唯一主鍵,比如mytest.person: id |
無 |
其他詳細引數可參考Canal AdminGuide
3.4 啟動
cd /usr/local/canal/
sh bin/startup.sh
3.5 檢視日誌
a.檢視 logs/canal/canal.log
vi logs/canal/canal.log
b. 檢視instance的日誌:
vi logs/example/example.log
3.6 關閉
cd /usr/local/canal/
sh bin/stop.sh
3.7 MQ資料消費
canal.client下有對應的MQ資料消費的樣例工程,包含資料編解碼的功能
- kafka模式: com.alibaba.otter.canal.client.running.kafka.CanalKafkaClientExample
- rocketMQ模式: com.alibaba.otter.canal.client.running.rocketmq.CanalRocketMQClientExample
原文地址可參考git地址:
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart