Redis實戰總結-管道和釋出、訂閱機制
本篇部落格重點介紹Redis的管道,釋出/訂閱機制。
Redis是一種基於Client-Server模型以及請求/響應協議的TCP服務。Client端發出請求,server端處理並返回結果到客戶端。在這個過程中Client端是以阻塞形式等待服務端的響應。假設從Client傳送命令到收到Server的處理結果需要1/16秒,這樣帶來的結果是Client每秒只能傳送16條命令,即使Redis每秒可以處理幾百個命令,嚴重影響了Redis的使用效率。因而針對上述問題Redis實現以下三種機制可以批量式的處理命令(通過減少Client和Server之間的網路通訊次數來提升Redis在執行多個命令的效能)。
處理多引數的命令:
Multi和Exec命令:將多個命令封裝成一個事務,以事務的方式提交,然後等待所有回覆出現。但是這種方式仍然會消耗資源,並且可能會導致其他重要的命令被延遲執行。正常會和Watch、Unwatch、Discard命令結合使用,確保自己正在使用的資料沒有發生變化來避免資料出錯。
管道技術:本篇部落格重點介紹的。
Redis的管道機制
在Server端未響應時,Client端可以連續式的向Server端傳送命令,並最終一次性讀取Server端的所有響應,各個命令之間互不干擾。顯然這種方式可以顯著性地提高了 redis 服務的效能。我們把這種方式稱為:非事務性流水線方式。
下面給出一個具體的測試例項:
/**
* 設定給定鍵的值
* @param key
* @param value
*/
public static void set(String key, String value) {
Jedis jedis = jedisPool.getResource();
try {
jedis.set(key, value);
} finally {
safeReturn(jedis);
}
}
/**
* 測試使用管道的效率
* @param key
*/
public static void batchTestPipeLine(String key) {
Jedis jedis = jedisPool.getResource();
try {
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < 10000; i++) {
Response<Long> returns = pipeline.incr(key);
}
pipeline.syncAndReturnAll();
} finally {
safeReturn(jedis);
}
}
/**
* 測試不使用管道的效率
* @param key
*/
public static void batchTestWithNonePipeLine(String key) {
Jedis jedis = jedisPool.getResource();
try {
for (int i = 0; i < 10000; i++) {
jedis.incr(key);
}
} finally {
safeReturn(jedis);
}
}
主程式測試:
import org.apache.log4j.Logger;
import util.JedisUtil;
public class Application2 {
private static Logger logger = Logger.getLogger(Application.class);
private static String PIPELINE_HAS = "PIPELINE_HAS";
private static String PIPELINE_NONE = "PIPELINE_NONE";
public static void main(String[] args) {
logger.info("主程式開始執行:");
Long beginTime = System.currentTimeMillis();
JedisUtil.init();
JedisUtil.set(PIPELINE_HAS, "0");
JedisUtil.set(PIPELINE_NONE, "0");
JedisUtil.batchTestWithNonePipeLine(PIPELINE_NONE);
Long endTime1 = System.currentTimeMillis();
JedisUtil.batchTestWithNonePipeLine(PIPELINE_HAS);
Long endTime2 = System.currentTimeMillis();
logger.info("未使用管道技術,消耗時間:"+(endTime1-beginTime));
logger.info("使用管道技術,消耗時間:"+(endTime2-endTime1));
}
}
程式執行結果:
通過測試發現,使用通道技術和不使用差距還是很明顯的。
Redis釋出和訂閱機制
Redis的釋出與訂閱機制由兩部分組成:釋出者,訂閱者。釋出者(publisher)負責向頻道傳送二進位制字串訊息;訂閱者(subscriber)負責訂閱頻道。每當釋出者將訊息傳送至給定頻道時,頻道的所有訂閱者都會收到訊息。
Redis的釋出和訂閱是一種訊息通訊模式,可以解除訊息釋出者和訊息訂閱者之間的耦,類似於設計模式中觀察者模式。
Redis中釋出和訂閱命令
命令 | 用例及描述 |
---|---|
SUBSCRIBE | SUBSCRIBE channel [channel …]——訂閱給定的一個或多個頻道 |
UNSUBSCRIBE | UNSUBSCRIBE [channel [channel …]]——退訂給定的一個或多個頻道,如果沒有給定任何頻道,退訂所有 |
PUBLISH | PUBLISH channel message——向給定頻道傳送訊息 |
PSUBSCRIBE | PSUBSCRIBE pattern [pattern …]——訂閱給定模式相匹配的所有頻道 |
PUNSUBSCRIBE | PUNSUBSCRIBE [pattern [pattern …]]——退訂給定的模式,如果沒有給定任何模式,退訂所有 |
JAVA中釋出和訂閱的實現
重寫類JedisPubSub中onMessage方法
package listener;
import redis.clients.jedis.JedisPubSub;
import java.util.logging.Logger;
/**
* 重寫Redis的PUB和SUB方法
* @author guweiyu
*/
public class RedisPubSubListener extends JedisPubSub {
Logger logger = Logger.getLogger(RedisPubSubListener.class.getName());
@Override
public void onMessage(String channel, String message) {
logger.info("channel:" + channel + "receives message :" + message);
this.unsubscribe(channel);
}
public void unsubscribe(String channel) {
logger.info("unsubscribe channel:"+channel);
super.unsubscribe(channel);
}
}
/**
* 訂閱頻道
* @param jedisPubSub
* @param channel
*/
public static void subscribe(JedisPubSub jedisPubSub, String channel) {
Jedis jedis = jedisPool.getResource();
try {
jedis.subscribe(jedisPubSub, channel);
} finally {
safeReturn(jedis);
}
}
/**
* 向頻道釋出資訊
* @param channel
* @param msg
*/
public static void publish(String channel, String msg) {
Jedis jedis = jedisPool.getResource();
try {
jedis.publish(channel, msg);
} finally {
safeReturn(jedis);
}
}
編寫PUB端測試案例
import org.apache.log4j.Logger;
import util.JedisUtil;
public class ApplicationPub {
private static Logger logger = Logger.getLogger(ApplicationPub.class);
public static void main(String[] args) throws Exception{
logger.info("Main Application is starting");
JedisUtil.init();
JedisUtil.publish("channel_test", "hello, world");
Thread.sleep(5000);
JedisUtil.publish("channel_test", "hello, china");
}
}
編寫SUB端測試案例
import listener.RedisPubSubListener;
import org.apache.log4j.Logger;
import util.JedisUtil;
public class ApplicationSub {
private static Logger logger = Logger.getLogger(ApplicationSub.class);
public static void main(String[] args) {
logger.info("Main Application is starting");
JedisUtil.init();
// 訂閱頻道
RedisPubSubListener listener = new RedisPubSubListener();
// Redis的subscribe是阻塞式方法,在取消訂閱該頻道前,會一直阻塞在這
JedisUtil.subscribe(listener, "channel_test");
logger.info("訂閱結束");
}
}
先啟動SUB端,再啟動PUB端,執行結果如下:
在實際中直接使用Redis的PUB和SUB機制處理訊息相對較少,由於和資料傳輸的可靠性有關。如果客戶端在執行訂閱操作的過程中斷線,那麼客戶端將會丟失在斷線期間傳送的所有訊息。