1. 程式人生 > >搭建並執行基於HA模式的canal

搭建並執行基於HA模式的canal

閱讀本文之前建議閱讀這篇文章,canal單機模式:http://blog.csdn.net/hackerwin7/article/details/37923607

機器準備:

mysql:192.168.213.41:3306

canal server:192.168.213.42:11111 、192.168.213.43:11111

zookeeper:192.168.213.44:4180、192.168.213.45:4180、192.168.213.46:4180

安裝與配置:

安裝配置mysql-》運程登入這個節點

  1. ssh [email protected]  

可以apt-get  或者  原始碼安裝都可以

配置my.cnf

  1. [mysqld]    
  2. log-bin=mysql-bin #新增這一行就ok    
  3. binlog-format=ROW #選擇row模式    
  4. server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複    

新建canal使用者,並賦予許可權
  1. CREATE USER canal IDENTIFIED BY 'canal';      
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';    
  3. -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;    
  4. FLUSH PRIVILEGES;   

關於配置mysql使用者密碼問題:http://blog.csdn.net/hackerwin7/article/details/38040057

安裝配置zookeeper-》下載zookeeper包  http://zookeeper.apache.org/releases.html#download

上傳到遠端節點

  1. scp zookeeper.tar.gz [email protected]  
  2. scp zookeeper.tar.gz [email protected]  
  3. scp zookeeper.tar.gz [email protected]
      

登入遠端節點(其他兩個節點的擦作大同小異)
  1. ssh [email protected]  

解壓包,配置,在cnof 目錄新建zoo.cfg
  1. tickTime=2000
  2. initLimit=5
  3. syncLimit=2
  4. dataDir=/home/canal/servers/zookeeper/data      
  5. dataLogDir=/home/canal/servers/zookeeper/logs      
  6. clientPort=4180
  7. server.44=192.168.213.44:2888:3888    
  8. server.45=192.168.213.45:2888:3888      
  9. server.46=192.168.213.46:2888:3888  

在目錄上述dataDir的目錄中建立myid, 將上述的三個數,server.X(我這裡的X是44,45,46),中的X值寫入myid檔案,三臺節點的myid這是不一樣的哈。

安裝canal-》 本機去下載canal的tar包 然後解壓,地址:https://github.com/alibaba/canal/releases 。

本機分別上傳包到兩個遠端節點

  1. scp canal.tar.gz [email protected]:/home/canal/  
  1. scp canal.tar.gz [email protected]:~/  
現在開始配置兩個遠端節點的canal

canal.properties:

  1. # 用逗號隔開 且 不留空格  
  2. canal.zkServers=192.168.213.44:4180,192.168.213.45:4180,192.168.213.46:4180  
  3. canal.instance.global.spring.xml = classpath:spring/default-instance.xml  
instance.properties:
  1. canal.instance.mysql.slaveId = 1234 ##另外一臺機器改成1235,保證slaveId不重複即可  
  2. canal.instance.master.address = 192.168.213.41:3306  

啟動執行:

1、mysql 啟動與執行

  1. /etc/init.d/mysqld restart #各版本mysql 啟動命令不太相同  
2、zookeeper 啟動與執行
  1. # 三個節點都要啟動  
  2. zookeeper/bin/zkServer.sh start  
然後在在三個節點中,隨意取一個執行客戶端
  1. ./zookeeper/bin/zkCli.sh -server 192.168.213.44:4180  
3、canal 啟動與執行

兩個節點都啟動canal

  1. ./canal/bin/startup.sh  

狀態檢測:

進入任一個 zookeeper 節點,進入zookeeper客戶端

  1. ./zookeeper/bin/zkCli.sh -server 192.168.213.44:4180  

裡面有get ls等命令來獲取相應節點資訊
  1. # 獲取正在執行的canal server  
  2. get /otter/canal/destinations/example/running  
  3. # 獲取正在連線的canal client  
  4. get /otter/canal/destinations/example/1001/running  
  5. # 獲取當前最後一次消費車成功的binlog  
  6. get /otter/canal/destinations/example/1001/cursor  

現在我們來執行client 客戶端 (見本文頂部的單機canal文章)

然後我們連結遠端mysql,在mysql執行相關insert creat 等語句

然後觀察client客戶端的具體抓取資訊。

HA模式切換:

現在我們來將通過zookeeper獲取正在執行的canal server,然後我們將當前執行的canal server 正常關閉掉,我們可以通過zookeeper看到另一臺canal server會成為正在執行的canal server,這就是HA模式的自動切換。這些都可以通過zookeeper查詢到狀態資訊。

本系列下篇文章:canal HA 的異常切換與 client的資料抓取。

直接在這裡附上執行了,注意迴圈try-catch的問題:

  1. /** 
  2.  * Created by hp on 14-7-17. 
  3.  */
  4. import java.net.InetSocketAddress;  
  5. import java.util.List;  
  6. import com.alibaba.otter.canal.client.CanalConnector;  
  7. import com.alibaba.otter.canal.common.utils.AddressUtils;  
  8. import com.alibaba.otter.canal.protocol.Message;  
  9. import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
  10. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
  11. import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
  12. import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
  13. import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
  14. import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
  15. import com.alibaba.otter.canal.protocol.exception.*;  
  16. import com.alibaba.otter.canal.client.*;  
  17. import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;  
  18. import org.jetbrains.annotations.NotNull;  
  19. publicclass ClientSample {  
  20.     publicstaticvoid main(String args[]) {  
  21.         // 建立連結
  22.         CanalConnector connector = CanalConnectors.newClusterConnector("192.168.213.44:4180,192.168.213.45:4180,192.168.213.46:4180""example""""");  
  23.         int batchSize = 1;  
  24.         int emptyCount = 0;  
  25.         while(true) {  
  26.             try {  
  27.                 connector.connect();  
  28.                 connector.subscribe(".*\\..*");  
  29.                 while(true) {  
  30.                     Message messages = connector.getWithoutAck(1000);  
  31.                     long bachId = messages.getId();  
  32.                     int size = messages.getEntries().size();  
  33.                     if(bachId == -1 || size == 0) {  
  34.                         try {  
  35.                             Thread.sleep(1000);  
  36.                         } catch (InterruptedException e) {  
  37.                             e.printStackTrace();  
  38.                         }  
  39.                         System.out.println("No DATA!!!!!!!!!!!!!!!!!!!!!!!!");  
  40.                     } else {  
  41.                         printEntry(messages.getEntries());  
  42.                     }  
  43.                 }  
  44.             } catch (Exception e) {  
  45.                 System.out.println("============================================================connect crash");  
  46.             } finally {  
  47.                 connector.disconnect();  
  48.             }  
  49.         }  
  50.     }  
  51.     privatestaticvoid printEntry(@NotNull List<Entry> entrys) {  
  52.         for (Entry entry : entrys) {  
  53.             if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
  54.                 continue;  
  55.             }  
  56.             RowChange rowChage = null;  
  57.             try {  
  58.                 rowChage = RowChange.parseFrom(entry.getStoreValue());  
  59.             } catch (Exception e) {  
  60.                 thrownew RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
  61.                         e);  
  62.             }  
  63.             EventType eventType = rowChage.getEventType();  
  64.             System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
  65.                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
  66.                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
  67.                     eventType));  
  68.             for (RowData rowData : rowChage.getRowDatasList()) {  
  69.                 if (eventType == EventType.DELETE) {  
  70.                     printColumn(rowData.getBeforeColumnsList());  
  71.                 } elseif (eventType == EventType.INSERT) {  
  72.                     printColumn(rowData.getAfterColumnsList());  
  73.                 } else {  
  74. <