搭建並執行基於HA模式的canal
閱讀本文之前建議閱讀這篇文章,canal單機模式:http://blog.csdn.net/hackerwin7/article/details/37923607
機器準備:
mysql:192.168.213.41:3306
canal server:192.168.213.42:11111 、192.168.213.43:11111
zookeeper:192.168.213.44:4180、192.168.213.45:4180、192.168.213.46:4180
安裝與配置:
安裝配置mysql-》運程登入這個節點
可以apt-get 或者 原始碼安裝都可以
配置my.cnf
- [mysqld]
- log-bin=mysql-bin #新增這一行就ok
- binlog-format=ROW #選擇row模式
- server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複
新建canal使用者,並賦予許可權
- CREATE USER canal IDENTIFIED BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
- FLUSH PRIVILEGES;
關於配置mysql使用者密碼問題:http://blog.csdn.net/hackerwin7/article/details/38040057
安裝配置zookeeper-》下載zookeeper包 http://zookeeper.apache.org/releases.html#download
上傳到遠端節點
- scp zookeeper.tar.gz [email protected]
- scp zookeeper.tar.gz [email protected]
-
scp zookeeper.tar.gz [email protected]
登入遠端節點(其他兩個節點的擦作大同小異)
解壓包,配置,在cnof 目錄新建zoo.cfg
- tickTime=2000
- initLimit=5
- syncLimit=2
- dataDir=/home/canal/servers/zookeeper/data
- dataLogDir=/home/canal/servers/zookeeper/logs
- clientPort=4180
- server.44=192.168.213.44:2888:3888
- server.45=192.168.213.45:2888:3888
- server.46=192.168.213.46:2888:3888
在目錄上述dataDir的目錄中建立myid, 將上述的三個數,server.X(我這裡的X是44,45,46),中的X值寫入myid檔案,三臺節點的myid這是不一樣的哈。
安裝canal-》 本機去下載canal的tar包 然後解壓,地址:https://github.com/alibaba/canal/releases 。
本機分別上傳包到兩個遠端節點
- scp canal.tar.gz [email protected]:/home/canal/
- scp canal.tar.gz [email protected]:~/
canal.properties:
- # 用逗號隔開 且 不留空格
- canal.zkServers=192.168.213.44:4180,192.168.213.45:4180,192.168.213.46:4180
- canal.instance.global.spring.xml = classpath:spring/default-instance.xml
- canal.instance.mysql.slaveId = 1234 ##另外一臺機器改成1235,保證slaveId不重複即可
- canal.instance.master.address = 192.168.213.41:3306
啟動執行:
1、mysql 啟動與執行
- /etc/init.d/mysqld restart #各版本mysql 啟動命令不太相同
- # 三個節點都要啟動
- zookeeper/bin/zkServer.sh start
- ./zookeeper/bin/zkCli.sh -server 192.168.213.44:4180
兩個節點都啟動canal
- ./canal/bin/startup.sh
狀態檢測:
進入任一個 zookeeper 節點,進入zookeeper客戶端
- ./zookeeper/bin/zkCli.sh -server 192.168.213.44:4180
裡面有get ls等命令來獲取相應節點資訊
- # 獲取正在執行的canal server
- get /otter/canal/destinations/example/running
- # 獲取正在連線的canal client
- get /otter/canal/destinations/example/1001/running
- # 獲取當前最後一次消費車成功的binlog
- get /otter/canal/destinations/example/1001/cursor
現在我們來執行client 客戶端 (見本文頂部的單機canal文章)
然後我們連結遠端mysql,在mysql執行相關insert creat 等語句
然後觀察client客戶端的具體抓取資訊。
HA模式切換:
現在我們來將通過zookeeper獲取正在執行的canal server,然後我們將當前執行的canal server 正常關閉掉,我們可以通過zookeeper看到另一臺canal server會成為正在執行的canal server,這就是HA模式的自動切換。這些都可以通過zookeeper查詢到狀態資訊。
本系列下篇文章:canal HA 的異常切換與 client的資料抓取。
直接在這裡附上執行了,注意迴圈try-catch的問題:
- /**
- * Created by hp on 14-7-17.
- */
- import java.net.InetSocketAddress;
- import java.util.List;
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.common.utils.AddressUtils;
- import com.alibaba.otter.canal.protocol.Message;
- import com.alibaba.otter.canal.protocol.CanalEntry.Column;
- import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
- import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
- import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
- import com.alibaba.otter.canal.protocol.exception.*;
- import com.alibaba.otter.canal.client.*;
- import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;
- import org.jetbrains.annotations.NotNull;
- publicclass ClientSample {
- publicstaticvoid main(String args[]) {
- // 建立連結
- CanalConnector connector = CanalConnectors.newClusterConnector("192.168.213.44:4180,192.168.213.45:4180,192.168.213.46:4180", "example", "", "");
- int batchSize = 1;
- int emptyCount = 0;
- while(true) {
- try {
- connector.connect();
- connector.subscribe(".*\\..*");
- while(true) {
- Message messages = connector.getWithoutAck(1000);
- long bachId = messages.getId();
- int size = messages.getEntries().size();
- if(bachId == -1 || size == 0) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("No DATA!!!!!!!!!!!!!!!!!!!!!!!!");
- } else {
- printEntry(messages.getEntries());
- }
- }
- } catch (Exception e) {
- System.out.println("============================================================connect crash");
- } finally {
- connector.disconnect();
- }
- }
- }
- privatestaticvoid printEntry(@NotNull List<Entry> entrys) {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
- RowChange rowChage = null;
- try {
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- thrownew RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.DELETE) {
- printColumn(rowData.getBeforeColumnsList());
- } elseif (eventType == EventType.INSERT) {
- printColumn(rowData.getAfterColumnsList());
- } else { <