1. 程式人生 > 其它 >最簡單的canal 1.1.6服務搭建方法

最簡單的canal 1.1.6服務搭建方法

前言:

        因為在專案中集成了ElasticSearch,用於某些業務場景的搜素或篩選。這裡關於ElasticSearch就不做介紹了,雖然解決了全文搜尋的效能問題。但是當出現一些頻繁更新的資料放置在ES就有點麻煩了。

        這時候,一款能把MySQL資料即時同步到ElasticSearch的工具就顯得格外重要了。經過比較篩選我選擇了阿里的canal,這裡應該就有人會說用Elastic全家桶的Logstash或filebeat不是更好嗎!接下來就會介紹我為什麼選canal以及最重要的canal的本地搭建(零基礎視角)。

 

對比:

        作為增量資料消費,應用與各種場景都有與之對應領域比較好的工具。比如上面的Logstash和filebeat雖都同屬於Elastic但也截然不同,更別說flume等等工具。我選擇canal的原因是他有介面卡,只要是MySQL同步場景,對面是一個能儲存的都可以,比如檔案,佇列,資料倉庫,ES等等都可以。而其他的更多的應用場景可能是定時的日誌採集,但cannal是通過監聽binlog後觸發操作,比較沒有好壞,只有適合與不適合。

 

場景:

1. 實時同步MySQL資料到ElasticSearch

2. redis快取的即時更新

3. 業務上商品訂閱降價等等

 

下載:

1. 地址:

https://github.com/alibaba/canal/releases

2.  github上如何下載原始碼發行包

    剛開始開啟canal沒看到的可能只是工具的原始碼,但是在windows下需要工具包。原始碼編譯打包對於新手來說還不是時候,所以就介紹一下github等其他版本倉庫別人開源的工具類程式碼如何下載發行包。

(1). 進入github倉庫主頁,一般是在右上角有一個releases超連結,點選進入就有各迭代版本的發行包介紹和下載資源了。

3. 哪個才是canal服務的工具包?

        點選進去可能看到的有如下好幾個包,而給我們開發語言(客戶端)能提供服務的是deployer,另外幾個分別是管控臺和介面卡等,以後有時間再介紹他們的用途和安裝方法。

配置:

1.  MySQL的binlog開啟

    因為canal通過偽裝成MySQL一個slave,通過dump協議與master通訊,並解析MySQL的binlog檔案。canal的工作原理和MySQL的binlog開啟方法這裡就不做介紹了,網上都比較多。

2.  canal例項的主要的幾個配置

(1).  MySQL賬戶, conf/example/instance.properties

canal.instance.dbUsername=canal     // 資料庫賬戶
canal.instance.dbPassword=canal     // 資料庫密碼

(2). 資料庫新建上面賬戶並授權

#建立使用者
CREATE USER canal IDENTIFIED BY 'canal';  
#建立許可權
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

(3).  canal服務埠賬戶,conf/canal.properties

# tcp指定的IP, 不填表示0.0.0.0
canal.ip =

# register ip to zookeeper
canal.register.ip =

# canal服務埠
canal.port = 11111
canal.metrics.pull.port = 11112

# canal 服務的賬號密碼,註釋表示客戶端連線無需賬號密碼
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

         下載完canal deployer後,如果只是作為客戶端請求測試,需要配置也就是上面兩個檔案(conf/example/instance.properties 和 conf/canal.properties), 除了上面按照自己的配置,其他的都保持原有引數不變即可。

 

啟動:

        windows下啟動,開啟cmd,進入根目錄下的bin,然後執行startup.bat就可以。啟動成功與否可以在logs目錄的兩個日誌檔案中檢視,如果有Error字眼,一般就是配置有問題,可以根據具體報錯具體查詢原因。

 

測試:

          因為canal是Java開發的,所以測試也採用Java作為客戶端列印一下實時解析binlog的結果。不過在跑Java程式前,windows可以通過以下兩個命令檢視canal啟動情況。

telnet 127.0.0.1 11111
netstat -ano | findstr "11111"

           接下來也依然以新手的視角(因為以前都是寫PHP,所以看我檔案的應該也都是PHPer,所以熟悉的同學們可以複製下面程式碼測試)建立Java專案,構建Jar包,編寫canal客戶端,編譯執行等等。

1.  開啟IntekkiJ IDEA, 建立一個Maven專案。

2.  開啟pom.xml新增以下依賴。

<dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
    </dependencies>

3. 開啟編輯器右上角的Maven按鈕,並按重新整理,等待下載依賴。

4.  src/main/java 新建一個 Java類檔案,貼上以下程式碼。

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class CanalClientTest {


    public static void main(String args[]) {
        // 建立連結,這裡就需要canal裡配置的埠,賬號密碼,destination預設先example就行
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1",11111),
                "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;

        try {
            connector.connect();
            // 防止 deserializer failed報錯
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;

            while (emptyCount < totalEmptyCount) {
                connector.subscribe();

                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }

                } else {
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交確認
                // connector.rollback(batchId); // 處理失敗, 回滾資料
            }

            System.out.println("empty too many times, exit");

        } finally {

            connector.disconnect();
        }

    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

5. 右擊該檔案Run, 開始編譯執行,並開啟運行臺,然後再在資料庫裡隨便找一張表,隨便修改以下資料,就可以實時檢視變動的資訊了。

 

交流學習: