搭建:canal部署與例項執行和解析MysqlBinlog日誌傳送到Kafka中
阿新 • • 發佈:2019-01-13
1、準備:
github:https://github.com/alibaba/canal
裡面有包括canal的文件,server端 client端的 例子 原始碼包等等。
2、canal概述:
canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業務需求而提出的。
早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。ps.
目前內部使用的同步,已經支援mysql5.x和 Oracle部分版本的日誌解析
基於日誌增量訂閱&消費支援的業務:
資料庫映象
資料庫實時備份
多級索引 (賣家和買家各自分庫索引)
search build
業務cache重新整理
價格變化等重要業務訊息
keyword:資料庫同步,增量訂閱&消費。
3、canal工作原理:
從上層來看,複製分成三步:master將改變記錄到二進位制日誌(binary log)中(這些記錄叫做二進位制日誌事件,binary log events,可以通過show binlog events進行檢視);
slave將master的binary log events拷貝到它的中繼日誌(relay log);
slave重做中繼日誌中的事件,將改變反映它自己的資料。
4、部署canal:
部署canal-server:
(1)開啟MySQL的binlog功能,並配置binlog模式為row。
在my.cnf 加入如下:
- [mysqld]
- log-bin=mysql-bin #新增這一行就ok
- binlog-format=ROW #選擇row模式
- server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複
(2)在mysql中 配置canal資料庫管理使用者,配置相應許可權(repication許可權)
- CREATEUSER canal IDENTIFIED BY'canal';
- GRANTSELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO'canal'@'%';
- -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
- FLUSH PRIVILEGES;
解壓到相應資料夾
- tar -zxvf canal canal
- drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin
- drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
- drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
- drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs
修改配置 instance.properties
- vim canal/conf/example/instance.properties
- #################################################
- ## mysql serverId
- canal.instance.mysql.slaveId = 1234
- # position info,需要改成自己的資料庫資訊
- canal.instance.master.address = 127.0.0.1:3306
- canal.instance.master.journal.name =
- canal.instance.master.position =
- canal.instance.master.timestamp =
- #canal.instance.standby.address =
- #canal.instance.standby.journal.name =
- #canal.instance.standby.position =
- #canal.instance.standby.timestamp =
- # username/password,需要改成自己的資料庫資訊
- canal.instance.dbUsername = canal
- canal.instance.dbPassword = canal
- canal.instance.defaultDatabaseName = canal_test
- canal.instance.connectionCharset = UTF-8
- # table regex
- canal.instance.filter.regex = .*\\..*
- #################################################
然後cd到bin目錄 啟動和停止canal-server
啟動
- ./startup.sh
- ./stop.sh
驗證啟動狀態,檢視log檔案
- vim canal/log/canal/canal.log
- 2014-07-18 10:21:08.525 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
- 2014-07-18 10:21:08.609 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.12.109.201:11111]
- 2014-07-18 10:21:09.037 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
上述日誌資訊顯示啟動canal成功
執行canal-client例項:
(1)建立例項maven工程
- mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample
(2)新增pom依賴:
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.0.12</version>
- </dependency>
(4)canal-client.Java 例項程式碼
- /**
- * Created by hp on 14-7-17.
- */
- import java.net.InetSocketAddress;
- import java.util.List;
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.common.utils.AddressUtils;
- import com.alibaba.otter.canal.protocol.Message;
- import com.alibaba.otter.canal.protocol.CanalEntry.Column;
- import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
- import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
- import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
- import com.alibaba.otter.canal.client.*;
- import org.jetbrains.annotations.NotNull;
- publicclass ClientSample {
- publicstaticvoid main(String args[]) {
- // 建立連結
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
- 11111), "example", "", "");
- int batchSize = 1000;
- int emptyCount = 0;
- try