1. 程式人生 > >Redis實戰總結-管道和釋出、訂閱機制

Redis實戰總結-管道和釋出、訂閱機制

本篇部落格重點介紹Redis的管道,釋出/訂閱機制。
Redis是一種基於Client-Server模型以及請求/響應協議的TCP服務。Client端發出請求,server端處理並返回結果到客戶端。在這個過程中Client端是以阻塞形式等待服務端的響應。假設從Client傳送命令到收到Server的處理結果需要1/16秒,這樣帶來的結果是Client每秒只能傳送16條命令,即使Redis每秒可以處理幾百個命令,嚴重影響了Redis的使用效率。因而針對上述問題Redis實現以下三種機制可以批量式的處理命令(通過減少Client和Server之間的網路通訊次數來提升Redis在執行多個命令的效能)。

  • 處理多引數的命令:

    比如MGET,MSET,HMGET,HMSET,RPUSH等命令可以接受多個引數,這樣能夠極大的提升效能,但是這些命令只能處理需要重複執行相同命令的操作。

  • Multi和Exec命令:將多個命令封裝成一個事務,以事務的方式提交,然後等待所有回覆出現。但是這種方式仍然會消耗資源,並且可能會導致其他重要的命令被延遲執行。正常會和Watch、Unwatch、Discard命令結合使用,確保自己正在使用的資料沒有發生變化來避免資料出錯。

  • 管道技術:本篇部落格重點介紹的。

Redis的管道機制

在Server端未響應時,Client端可以連續式的向Server端傳送命令,並最終一次性讀取Server端的所有響應,各個命令之間互不干擾。顯然這種方式可以顯著性地提高了 redis 服務的效能。我們把這種方式稱為:非事務性流水線方式。
下面給出一個具體的測試例項:

    /**
     * 設定給定鍵的值
     * @param key
     * @param value
     */
    public static void set(String key, String value) {
        Jedis jedis = jedisPool.getResource();
        try {
            jedis.set(key, value);
        } finally {
            safeReturn(jedis);
        }
    }

    /**
     * 測試使用管道的效率
     * @param
key */
public static void batchTestPipeLine(String key) { Jedis jedis = jedisPool.getResource(); try { Pipeline pipeline = jedis.pipelined(); for (int i = 0; i < 10000; i++) { Response<Long> returns = pipeline.incr(key); } pipeline.syncAndReturnAll(); } finally { safeReturn(jedis); } } /** * 測試不使用管道的效率 * @param key */ public static void batchTestWithNonePipeLine(String key) { Jedis jedis = jedisPool.getResource(); try { for (int i = 0; i < 10000; i++) { jedis.incr(key); } } finally { safeReturn(jedis); } }

主程式測試:

import org.apache.log4j.Logger;
import util.JedisUtil;

public class Application2 {

    private static Logger logger = Logger.getLogger(Application.class);

    private static String PIPELINE_HAS = "PIPELINE_HAS";
    private static String PIPELINE_NONE = "PIPELINE_NONE";

    public static void main(String[] args) {

        logger.info("主程式開始執行:");
        Long beginTime = System.currentTimeMillis();

        JedisUtil.init();

        JedisUtil.set(PIPELINE_HAS, "0");
        JedisUtil.set(PIPELINE_NONE, "0");

        JedisUtil.batchTestWithNonePipeLine(PIPELINE_NONE);
        Long endTime1 = System.currentTimeMillis();

        JedisUtil.batchTestWithNonePipeLine(PIPELINE_HAS);
        Long endTime2 = System.currentTimeMillis();

        logger.info("未使用管道技術,消耗時間:"+(endTime1-beginTime));
        logger.info("使用管道技術,消耗時間:"+(endTime2-endTime1));
    }
}

程式執行結果:
這裡寫圖片描述
通過測試發現,使用通道技術和不使用差距還是很明顯的。

Redis釋出和訂閱機制

Redis的釋出與訂閱機制由兩部分組成:釋出者,訂閱者。釋出者(publisher)負責向頻道傳送二進位制字串訊息;訂閱者(subscriber)負責訂閱頻道。每當釋出者將訊息傳送至給定頻道時,頻道的所有訂閱者都會收到訊息。

Redis的釋出和訂閱是一種訊息通訊模式,可以解除訊息釋出者和訊息訂閱者之間的耦,類似於設計模式中觀察者模式。

Redis中釋出和訂閱命令

命令 用例及描述
SUBSCRIBE SUBSCRIBE channel [channel …]——訂閱給定的一個或多個頻道
UNSUBSCRIBE UNSUBSCRIBE [channel [channel …]]——退訂給定的一個或多個頻道,如果沒有給定任何頻道,退訂所有
PUBLISH PUBLISH channel message——向給定頻道傳送訊息
PSUBSCRIBE PSUBSCRIBE pattern [pattern …]——訂閱給定模式相匹配的所有頻道
PUNSUBSCRIBE PUNSUBSCRIBE [pattern [pattern …]]——退訂給定的模式,如果沒有給定任何模式,退訂所有

JAVA中釋出和訂閱的實現

重寫類JedisPubSub中onMessage方法

package listener;

import redis.clients.jedis.JedisPubSub;

import java.util.logging.Logger;

/**
 * 重寫Redis的PUB和SUB方法
 * @author guweiyu
 */
public class RedisPubSubListener extends JedisPubSub {

    Logger logger = Logger.getLogger(RedisPubSubListener.class.getName());

    @Override
    public void onMessage(String channel, String message) {
        logger.info("channel:" + channel + "receives message :" + message);
        this.unsubscribe(channel);
    }

    public void unsubscribe(String channel) {
        logger.info("unsubscribe channel:"+channel);
        super.unsubscribe(channel);
    }
}
    /**
     * 訂閱頻道
     * @param jedisPubSub
     * @param channel
     */
    public static void subscribe(JedisPubSub jedisPubSub, String channel) {
        Jedis jedis = jedisPool.getResource();
        try {
            jedis.subscribe(jedisPubSub, channel);
        } finally {
            safeReturn(jedis);
        }
    }

    /**
     * 向頻道釋出資訊
     * @param channel
     * @param msg
     */
    public static void publish(String channel, String msg) {
        Jedis jedis = jedisPool.getResource();
        try {
            jedis.publish(channel, msg);
        } finally {
            safeReturn(jedis);
        }
    }

編寫PUB端測試案例

import org.apache.log4j.Logger;
import util.JedisUtil;

public class ApplicationPub {

    private static Logger logger = Logger.getLogger(ApplicationPub.class);

    public static void main(String[] args) throws Exception{

        logger.info("Main Application is starting");
        JedisUtil.init();
        JedisUtil.publish("channel_test", "hello, world");
        Thread.sleep(5000);
        JedisUtil.publish("channel_test", "hello, china");
    }
}

編寫SUB端測試案例

import listener.RedisPubSubListener;
import org.apache.log4j.Logger;
import util.JedisUtil;

public class ApplicationSub {

    private static Logger logger = Logger.getLogger(ApplicationSub.class);

    public static void main(String[] args) {

        logger.info("Main Application is starting");
        JedisUtil.init();

        // 訂閱頻道
        RedisPubSubListener listener = new RedisPubSubListener();
        // Redis的subscribe是阻塞式方法,在取消訂閱該頻道前,會一直阻塞在這
        JedisUtil.subscribe(listener, "channel_test");
        logger.info("訂閱結束");
    }
}

先啟動SUB端,再啟動PUB端,執行結果如下:
這裡寫圖片描述
這裡寫圖片描述

在實際中直接使用Redis的PUB和SUB機制處理訊息相對較少,由於和資料傳輸的可靠性有關。如果客戶端在執行訂閱操作的過程中斷線,那麼客戶端將會丟失在斷線期間傳送的所有訊息。