1. 程式人生 > >利用canal使Mysql緩衝Redis

利用canal使Mysql緩衝Redis

(啟動linux中的redis、mysql、jdk和canal(這三個在linux中安裝好吧,具體步驟檢視我其他的文章)關閉linux防火牆和允許mysql遠端訪問)

從頭建立工程
依賴配置:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.12</version>
</dependency>
    <dependency
>
<groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>3.1.2.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId
>
redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.4.2</version> </dependency>
  1. 建立mvn標準工程:

mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample

  1. 修改pom.xml,新增上面的依賴
  2. ClientSample程式碼
package com
.alibaba.otter.canal.sample; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; public class SimpleCanalClientExample { public static void main(String args[]) { // 建立連結 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(“可以直接改成你的虛擬機器中linux的ip”), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount &lt; totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾資料 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List&lt;Entry&gt; entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------&gt; before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------&gt; after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List&lt;Column&gt; columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } private static void redisInsert( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisUpdate( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisDelete( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.delKey("user:"+ columns.get(0).getValue()); } } }

4.RedisUtil程式碼
這裡主要做兩個工作,一個是迴圈從Canal上取資料,一個是將資料更新至Redis

    package canal.sample;  

    import redis.clients.jedis.Jedis;  
    import redis.clients.jedis.JedisPool;  
    import redis.clients.jedis.JedisPoolConfig;  

    public class RedisUtil {  

        // Redis伺服器IP  
        private static String ADDR = "10.1.2.190";  

        // Redis的埠號  
        private static int PORT = 6379;  

        // 訪問密碼  
        private static String AUTH = "admin";  

        // 可用連線例項的最大數目,預設值為8;  
        // 如果賦值為-1,則表示不限制;如果pool已經分配了maxActive個jedis例項,則此時pool的狀態為exhausted(耗盡)。  
        private static int MAX_ACTIVE = 1024;  

        // 控制一個pool最多有多少個狀態為idle(空閒的)的jedis例項,預設值也是8。  
        private static int MAX_IDLE = 200;  

        // 等待可用連線的最大時間,單位毫秒,預設值為-1,表示永不超時。如果超過等待時間,則直接丟擲JedisConnectionException;  
        private static int MAX_WAIT = 10000;  

        // 過期時間  
        protected static int  expireTime = 660 * 660 *24;  

        // 連線池  
        protected static JedisPool pool;  

        /** 
         * 靜態程式碼,只在初次呼叫一次 
         */  
        static {  
            JedisPoolConfig config = new JedisPoolConfig();  
            //最大連線數  
            config.setMaxTotal(MAX_ACTIVE);  
            //最多空閒例項  
            config.setMaxIdle(MAX_IDLE);  
            //超時時間  
            config.setMaxWaitMillis(MAX_WAIT);  
            //  
            config.setTestOnBorrow(false);  
            pool = new JedisPool(config, ADDR, PORT, 1000);  
        }  

        /** 
         * 獲取jedis例項 
         */  
        protected static synchronized Jedis getJedis() {  
            Jedis jedis = null;  
            try {  
                jedis = pool.getResource();  
            } catch (Exception e) {  
                e.printStackTrace();  
                if (jedis != null) {  
                    pool.returnBrokenResource(jedis);  
                }  
            }  
            return jedis;  
        }  

        /** 
         * 釋放jedis資源 
         *  
         * @param jedis 
         * @param isBroken 
         */  
        protected static void closeResource(Jedis jedis, boolean isBroken) {  
            try {  
                if (isBroken) {  
                    pool.returnBrokenResource(jedis);  
                } else {  
                    pool.returnResource(jedis);  
                }  
            } catch (Exception e) {  

            }  
        }  

        /** 
         *  是否存在key 
         *  
         * @param key 
         */  
        public static boolean existKey(String key) {  
            Jedis jedis = null;  
            boolean isBroken = false;  
            try {  
                jedis = getJedis();  
                jedis.select(0);  
                return jedis.exists(key);  
            } catch (Exception e) {  
                isBroken = true;  
            } finally {  
                closeResource(jedis, isBroken);  
            }  
            return false;  
        }  

        /** 
         *  刪除key 
         *  
         * @param key 
         */  
        public static void delKey(String key) {  
            Jedis jedis = null;  
            boolean isBroken = false;  
            try {  
                jedis = getJedis();  
                jedis.select(0);  
                jedis.del(key);  
            } catch (Exception e) {  
                isBroken = true;  
            } finally {  
                closeResource(jedis, isBroken);  
            }  
        }  

        /** 
         *  取得key的值 
         *  
         * @param key 
         */  
        public static String stringGet(String key) {  
            Jedis jedis = null;  
            boolean isBroken = false;  
            String lastVal = null;  
            try {  
                jedis = getJedis();  
                jedis.select(0);  
                lastVal = jedis.get(key);  
                jedis.expire(key, expireTime);  
            } catch (Exception e) {  
                isBroken = true;  
            } finally {  
                closeResource(jedis, isBroken);  
            }  
            return lastVal;  
        }  

        /** 
         *  新增string資料 
         *  
         * @param key 
         * @param value 
         */  
        public static String stringSet(String key, String value) {  
            Jedis jedis = null;  
            boolean isBroken = false;  
            String lastVal = null;  
            try {  
                jedis = getJedis();  
                jedis.select(0);  
                lastVal = jedis.set(key, value);  
                jedis.expire(key, expireTime);  
            } catch (Exception e) {  
                e.printStackTrace();  
                isBroken = true;  
            } finally {  
                closeResource(jedis, isBroken);  
            }  
            return lastVal;  
        }  

        /** 
         *  新增hash資料 
         *  
         * @param key 
         * @param field 
         * @param value 
         */  
        public static void hashSet(String key, String field, String value) {  
            boolean isBroken = false;  
            Jedis jedis = null;  
            try {  
                jedis = getJedis();  
                if (jedis != null) {  
                    jedis.select(0);  
                    jedis.hset(key, field, value);  
                    jedis.expire(key, expireTime);  
                }  
            } catch (Exception e) {  
                isBroken = true;  
            } finally {  
                closeResource(jedis, isBroken);  
            }  
        }  

    }  
  1. 執行Client

首先啟動linux中的Canal Server

啟動Canal Client後,可以從控制檯從看到類似訊息:

empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8

此時代表當前資料庫無變更資料

  1. 觸發資料庫變更
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
    ->   `ID` int(11) NOT NULL AUTO_INCREMENT,
    ->   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    ->   PRIMARY KEY (`ID`)
    -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)

mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

6.可以從控制檯中看到:

empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8

================> binlog[mysql-bin.000001:1082] , name[test,xdual] , eventType : INSERT
ID : 3 update=true
X : 2017-06-13 07:03:32 update=true

empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8