1. 程式人生 > >canal實時抽取mysql資料傳送到kafka

canal實時抽取mysql資料傳送到kafka

基本說明

canal 1.1.1版本之後, 預設支援將canal server接收到的binlog資料直接投遞到MQ, 目前預設支援的MQ系統有:

 

環境版本

  • 作業系統:CentOS release 6.6 (Final)
  • java版本: jdk1.8
  • canal 版本: 請下載最新的安裝包,本文以當前v1.1.1 的canal.deployer-1.1.1.tar.gz為例
  • MySQL版本 :5.7.18
  • 注意 : 關閉所有機器的防火牆,同時注意啟動可以相互telnet ip 埠

 

一、 安裝zookeeper

參考:Zookeeper QuickStart

 

二、安裝MQ

 

三、 安裝canal.server

 

3.1 下載壓縮包

到官網地址(release)下載最新壓縮包,請下載 canal.deployer-latest.tar.gz

 

3.2 將canal.deployer 複製到固定目錄並解壓

mkdir -p /usr/local/canal
cp   canal.deployer-1.1.1.tar.gz   /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz 

 

3.3 配置修改引數

a. 修改instance 配置檔案 vi conf/example/instance.properties

#  按需修改成自己的資料庫資訊
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,資料庫的使用者名稱和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#庫名.表名: 唯一主鍵,多個表之間用逗號分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

對應ip 地址的MySQL 資料庫需進行相關初始化與設定, 可參考 Canal QuickStart

b. 修改canal 配置檔案vi /usr/local/canal/conf/canal.properties

# ...
# 可選項: tcp(預設), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 叢集配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 預設50K, 由於kafka最大訊息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get資料的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為flat json格式物件
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all

 

mq相關引數說明

引數名 引數說明 預設值
canal.mq.servers kafka為bootstrap.servers
rocketMQ中為nameserver列表
127.0.0.1:6667
canal.mq.retries 傳送失敗重試次數 0
canal.mq.batchSize kafka為ProducerConfig.BATCH_SIZE_CONFIG
rocketMQ無意義
16384
canal.mq.maxRequestSize kafka為ProducerConfig.MAX_REQUEST_SIZE_CONFIG
rocketMQ無意義
1048576
canal.mq.lingerMs kafka為ProducerConfig.LINGER_MS_CONFIG
rocketMQ無意義
1
canal.mq.bufferMemory kafka為ProducerConfig.BUFFER_MEMORY_CONFIG
rocketMQ無意義
33554432
canal.mq.producerGroup kafka無意義
rocketMQ為ProducerGroup名
Canal-Producer
canal.mq.canalBatchSize 獲取canal資料的批次大小 50
canal.mq.canalGetTimeout 獲取canal資料的超時時間 100
canal.mq.flatMessage 是否為json格式
如果設定為false,對應MQ收到的訊息為protobuf格式
需要通過CanalMessageDeserializer進行解碼
true
--- --- ---
canal.mq.topic mq裡的topic名
canal.mq.partition 單佇列模式的分割槽下標, 1
canal.mq.partitionsNum 雜湊模式的分割槽數
canal.mq.partitionHash 雜湊規則定義
庫名.表名 : 唯一主鍵,比如mytest.person: id

其他詳細引數可參考Canal AdminGuide

 

3.4 啟動

cd /usr/local/canal/
sh bin/startup.sh

 

3.5 檢視日誌

a.檢視 logs/canal/canal.log

vi logs/canal/canal.log

b. 檢視instance的日誌:

vi logs/example/example.log

 

3.6 關閉

cd /usr/local/canal/
sh bin/stop.sh

 

3.7 MQ資料消費

canal.client下有對應的MQ資料消費的樣例工程,包含資料編解碼的功能

原文地址可參考git地址:

https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart