1. 程式人生 > 實用技巧 >canal 執行sql 語句

canal 執行sql 語句

  • cancal 資料庫同步工具也可用於同步建索

(一)下載

官網架包地址為:https://github.com/alibaba/canal/releases/tag/canal-1.1.5-alpha-2

本人百度雲盤下載地址:

連結:https://pan.baidu.com/s/1MM5YGubaTW3Y2hy1tvBmPw

提取碼:jiur

(二)上傳解壓

tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz

查詢MySQLLinux環境中的my.cnf

mysql --help|grep 'my.cnf'

2.修改my.cnf
vi /etc/my.cnf
    
log
-bin=mysql-bin #新增這一行就ok binlog-format=ROW #選擇row模式 server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複

重啟 MySQL

檢視MySQL啟動狀態
 
service mysqld status(5.0版本是mysqld)
service mysql status(5.5.7版本是mysql)
 
重啟MySQL
 
service mysqld restart
service mysql restart (5.5.7版本命令)

在Linux中登入MySQL

mysql -u 使用者名稱 -p
如:mysql 
-u root -p 輸入密碼對應的賬號密碼

檢視binlog檔案是否開啟

show variables like 'log_%';

package com.zxsoft.ntmss.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author idea * @date 2019/5/6 * @Version V1.0 */ public class Canal2ElsticsearchIndex { private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); public static void main(String args[]) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.101", 11111), "example", "", ""); connector.connect(); int batchSize = 1000; try { connector.connect(); connector.rollback(); try { while (true) { //嘗試從master那邊拉去資料batchSize條記錄,有多少取多少 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(1000); } else { dataHandle(message.getEntries()); } connector.ack(batchId); //當佇列裡面堆積的sql大於一定數值的時候就模擬執行 if (SQL_QUEUE.size() >= 1) { executeQueueSql(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } finally { connector.disconnect(); } } /** * 模擬執行佇列裡面的sql語句 */ public static void executeQueueSql() { int size = SQL_QUEUE.size(); for (int i = 0; i < size; i++) { String sql = SQL_QUEUE.poll(); System.out.println("[sql]----> " + sql); } } /** * 資料處理 * * @param entrys */ private static void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException { for (Entry entry : entrys) { if (EntryType.ROWDATA == entry.getEntryType()) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == EventType.INSERT) { saveInsertSql(entry); } } } } /** * 儲存更新語句 * * @param entry */ private static void saveUpdateSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> newColumnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set "); for (int i = 0; i < newColumnList.size(); i++) { sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'"); if (i != newColumnList.size() - 1) { sql.append(","); } } sql.append(" where "); List<Column> oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) { if (column.getIsKey()) { //暫時只支援單一主鍵 sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 儲存刪除語句 * * @param entry */ private static void saveDeleteSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where "); for (Column column : columnList) { if (column.getIsKey()) { //暫時只支援單一主鍵 sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 儲存插入語句 * * @param entry */ private static void saveInsertSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } }