Redis用作訊息佇列
Redis不僅可作為快取伺服器,還可用作訊息佇列。它的列表型別天生支援用作訊息佇列。如下圖所示:
由於Redis的列表是使用雙向連結串列實現的,儲存了頭尾節點,所以在列表頭尾兩邊插取元素都是非常快的。
所以可以直接使用Redis的List實現訊息佇列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。簡單示例如下:
存放訊息端(訊息生產者):
訊息處理端(訊息消費者):
-
package org.yamikaze.redis.messsage.queue;
-
import org.yamikaze.redis.test.MyJedisFactory;
-
import redis.clients.jedis.Jedis;
-
/**
-
* 訊息消費者
-
* @author yamikaze
-
*/
-
public class Customer extends Thread{
-
private String customerName;
-
private volatile int count;
-
private Jedis jedis;
-
public Customer(String name) {
-
this.customerName = name;
-
init();
-
}
-
private void init() {
-
jedis = MyJedisFactory.getLocalJedis();
-
}
-
public void processMessage() {
-
String message = jedis.rpop(Producer.MESSAGE_KEY);
-
if(message != null) {
-
count++;
-
handle(message);
-
}
-
}
-
public void handle(String message) {
-
System.out.println(customerName + " 正在處理訊息,訊息內容是: " + message + " 這是第" + count + "條");
-
}
-
@Override
-
public void run() {
-
while (true) {
-
processMessage();
-
}
-
}
-
public static void main(String[] args) {
-
Customer customer = new Customer("yamikaze");
-
customer.start();
-
}
-
}
但上述例子中訊息消費者有一個問題存在,即需要不停的呼叫rpop方法檢視List中是否有待處理訊息。每呼叫一次都會發起一次連線,這會造成不必要的浪費。也許你會使用Thread.sleep()等方法讓消費者執行緒隔一段時間再消費,但這樣做有兩個問題:
1)、如果生產者速度大於消費者消費速度,訊息佇列長度會一直增大,時間久了會佔用大量記憶體空間。
2)、如果睡眠時間過長,這樣不能處理一些時效性的訊息,睡眠時間過短,也會在連線上造成比較大的開銷。
所以可以使用brpop指令,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,於是消費端可以將processMessage可以改為這樣:
-
public void processMessage() {
-
/**
-
* brpop支援多個列表(佇列)
-
* brpop指令是支援佇列優先順序的,比如這個例子中MESSAGE_KEY的優先順序大於testKey(順序決定)。
-
* 如果兩個列表中都有元素,會優先返回優先順序高的列表中的元素,所以這兒優先返回MESSAGE_KEY
-
* 0表示不限制等待,會一直阻塞在這兒
-
*/
-
List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");
-
if(messages.size() != 0) {
-
//由於該指令可以監聽多個Key,所以返回的是一個列表
-
//列表由2項組成,1) 列表名,2)資料
-
String keyName = messages.get(0);
-
//如果返回的是MESSAGE_KEY的訊息
-
if(Producer.MESSAGE_KEY.equals(keyName)) {
-
String message = messages.get(1);
-
handle(message);
-
}
-
}
-
System.out.println("=======================");
-
}
然後可以執行Customer,清空控制檯,可以看到程式沒有任何輸出,阻塞在了brpop這兒。然後在開啟Redis的客戶端,輸入指令client list,可以檢視當前有兩個連線。
釋出/訂閱模式
Redis除了對訊息佇列提供支援外,還提供了一組命令用於支援釋出/訂閱模式。
1)釋出
PUBLISH指令可用於釋出一條訊息,格式 PUBLISH channel message
返回值表示訂閱了該訊息的數量。
2)訂閱
SUBSCRIBE指令用於接收一條訊息,格式 SUBSCRIBE channel
可以看到使用SUBSCRIBE指令後進入了訂閱模式,但沒有接收到publish傳送的訊息,這是因為只有在訊息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回覆。回覆分為三種類型:
1、如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是第幾個訂閱的頻道?(理解成序號?)
2、如果為message(訊息),第二個值為產生該訊息的頻道,第三個值為訊息
3、如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示當前客戶端的訂閱數量。
可以使用指令UNSUBSCRIBE退訂,如果不加引數,則會退訂所有由SUBSCRIBE指令訂閱的頻道。
Redis還支援基於萬用字元的訊息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:
再試試推送訊息會得到以下結果:
可以看到publish指令返回的是2,而訂閱端這邊接收了兩次訊息。這是因為PSUBSCRIBE指令可以重複訂閱頻道。而使用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。同時PUNSUBSCRIBE指令萬用字元不會展開。
例如:PUNSUBSCRIBE * 不會匹配到 channel.*, 所以要取消訂閱channel.*就要這樣寫PUBSUBSCRIBE channel.*。
程式碼示範如下:
-
package org.yamikaze.redis.messsage.subscribe;
-
import org.yamikaze.redis.messsage.queue.StringUtils;
-
import org.yamikaze.redis.test.MyJedisFactory;
-
import redis.clients.jedis.Jedis;
-
/**
-
* 訊息釋出方
-
* @author yamikaze
-
*/
-
public class Publisher {
-
public static final String CHANNEL_KEY = "channel:message";
-
private Jedis jedis;
-
public Publisher() {
-
jedis = MyJedisFactory.getLocalJedis();
-
}
-
public void publishMessage(String message) {
-
if(StringUtils.isBlank(message)) {
-
return;
-
}
-
jedis.publish(CHANNEL_KEY, message);
-
}
-
public static void main(String[] args) {
-
Publisher publisher = new Publisher();
-
publisher.publishMessage("Hello Redis!");
-
}
-
}
簡單的傳送一個訊息。
訊息訂閱方:
-
package org.yamikaze.redis.messsage.subscribe;
-
import org.yamikaze.redis.test.MyJedisFactory;
-
import redis.clients.jedis.Jedis;
-
import redis.clients.jedis.JedisPubSub;
-
import java.util.concurrent.TimeUnit;
-
/**
-
* 訊息訂閱方客戶端
-
* @author yamikaze
-
*/
-
public class SubscribeClient {
-
private Jedis jedis;
-
private static final String EXIT_COMMAND = "exit";
-
public SubscribeClient() {
-
jedis = MyJedisFactory.getLocalJedis();
-
}
-
public void subscribe(String ...channel) {
-
if(channel == null || channel.length <= 0) {
-
return;
-
}
-
//訊息處理,接收到訊息時如何處理
-
JedisPubSub jps = new JedisPubSub() {
-
/**
-
* JedisPubSub類是一個沒有抽象方法的抽象類,裡面方法都是一些空實現
-
* 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage
-
* 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法
-
* 當然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法引數為byte[]
-
*/
-
@Override
-
public void onMessage(String channel, String message) {
-
if(Publisher.CHANNEL_KEY.equals(channel)) {
-
System.out.println("接收到訊息: channel : " + message);
-
//接收到exit訊息後退出
-
if(EXIT_COMMAND.equals(message)) {
-
System.exit(0);
-
}
-
}
-
}
-
/**
-
* 訂閱時
-
*/
-
@Override
-
public void onSubscribe(String channel, int subscribedChannels) {
-
if(Publisher.CHANNEL_KEY.equals(channel)) {
-
System.out.println("訂閱了頻道:" + channel);
-
}
-
}
-
};
-
//可以訂閱多個頻道 當前執行緒會阻塞在這兒
-
jedis.subscribe(jps, channel);
-
}
-
public static void main(String[] args) {
-
SubscribeClient client = new SubscribeClient();
-
client.subscribe(Publisher.CHANNEL_KEY);
-
//並沒有 unsubscribe方法
-
//相應的也沒有punsubscribe方法
-
}
-
}
先執行client,再執行Publisher進行訊息傳送,輸出結果:
總結:
使用Redis的List資料結構可以簡單迅速地做一個訊息佇列,同時Redis提供的BRPOP和BLPOP等指令解決了頻繁呼叫Jedis的rpop和lpop方法造成的資源浪費問題。除此之外,Redis提供對釋出/訂閱模式的指令,可以實現訊息傳遞、程序間通訊。