使用Redis的Pub/Sub來實現類似於JMS的訊息持久化
阿新 • • 發佈:2019-01-31
關於個人對Redis提供的Pub/Sub機制的認識在上一篇部落格中涉及到了,也提到了關於如何避免Redis的Pub/Sub的一個最大的缺陷的思路—訊息的持久化(http://blog.csdn.net/canot/article/details/51975566)。這篇文章主要是關於其思路(Redis的Pub/Sub的訊息持久化)的程式碼實現:
Pub/Sub機制中最核心的Listener的實現:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.util.RedisInputStream;
public class PubSubListener extends JedisPubSub {
private String clientId;
private HandlerRedis handlerRedis;
// 生成PubSubListener的時候必須制定一個id
public PubSubListener(String cliendId, Jedis jedis) {
this.clientId = cliendId;
jedis.auth("xxxx");
handlerRedis = new HandlerRedis(jedis);
}
@Override
public void onMessage(String channel, String message) {
if ("quit".equals(message)) {
this.unsubscribe(channel);
}
handlerRedis.handler(channel, message);
}
// 真正處理接受的地方
private void message(String channel, String message) {
System.out.println("message receive:" + message + ",channel:" + channel + "...");
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// 將訂閱者儲存在一個"訂閱活躍者集合中"
handlerRedis.subscribe(channel);
System.out.println("subscribe:" + channel);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
handlerRedis.ubsubscribe(channel);
System.out.println("unsubscribe:" + channel);
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
class HandlerRedis {
private Jedis jedis;
public HandlerRedis(Jedis jedis) {
this.jedis = jedis;
}
public void handler(String channel, String message) {
int index = message.indexOf("/");
if (index < 0) {
// 訊息不合法,丟棄
return;
}
Long txid = Long.valueOf(message.substring(0, index));
String key = clientId + "/" + channel;
while (true) {
String lm = jedis.lindex(key, 0);// 獲取第一個訊息
if (lm == null) {
break;
}
int li = lm.indexOf("/");
// 如果訊息不合法,刪除並處理
if (li < 0) {
String result = jedis.lpop(key);// 刪除當前message
// 為空
if (result == null) {
break;
}
message(channel, lm);
continue;
}
Long lxid = Long.valueOf(lm.substring(0, li));// 獲取訊息的txid
// 直接消費txid之前的殘留訊息
if (txid >= lxid) {
jedis.lpop(key);// 刪除當前message
message(channel, lm);
continue;
} else {
break;
}
}
}
// 持久化訂閱操作
public void subscribe(String channel) {
// 保證在訂閱者集合中的格式為 唯一識別符號/訂閱的通道
String key = clientId + "/" + channel;
// 判斷該客戶端是否在集合中存在
boolean isExist = jedis.sismember("PERSIS_SUB", key);
if (!isExist) {
// 不存在則新增
jedis.sadd("PERSIS_SUB", key);
}
}
public void ubsubscribe(String channel) {
String key = clientId + "/" + channel;
// 從“活躍訂閱者”集合中
jedis.srem("PERSIS_SUB", key);
// 刪除“訂閱者訊息佇列”
jedis.del(channel);
}
}
}
Listener中定義了一個內部類HandlerRedis。Listener類將onMessage以及onSubscribe兩個方法交付於HandlerRedis。Handler處理這個方法的時候也即進行著佇列的維護。Listener類中定義了一個message()方法,該方法是handler的回撥方法,即真正的處理訊息的地方。
通道的訂閱客戶端類:
import redis.clients.jedis.Jedis;
public class SubClient {
private Jedis jedis;
private PubSubListener listener;
public SubClient(String host,PubSubListener pubSubListener){
jedis = new Jedis(host);
jedis.auth("XXXXX");
this.listener = pubSubListener;
}
public void sub(String channel){
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
listener.unsubscribe(channel);
}
}
通道的訊息釋出的客戶端:
import java.util.Set;
import redis.clients.jedis.Jedis;
public class PubClient {
private Jedis jedis;
public PubClient(String host) {
jedis = new Jedis(host);
jedis.auth("wx950709");
}
/**
* 釋出的每條訊息,都需要在“訂閱者訊息佇列”中持久
*
* @param message
*/
public void put(String message) {
//獲取所有活躍的訊息接收者客戶端 clientID/channel
Set<String> subClients = jedis.smembers("PERSIS_SUB");
for (String subs : subClients) {
// 儲存每個客戶端的訊息
jedis.rpush(subs, message);
}
}
public void publish(String channel, String message) {
// 每個訊息,都有具有一個全域性唯一的id
// txid為了防止訂閱端在資料處理時“亂序”,這就要求訂閱者需要解析message
Long txid = jedis.incr("MESSAGE_TXID");
String content = txid + "/" + message;
this.put(content);
jedis.publish(channel, content);//為每個訊息設定id,最終訊息格式1000/messageContent
}
public void close(String channel){
jedis.publish(channel, "quit");
jedis.del(channel);//刪除
}
}
測試引導類:
import redis.clients.jedis.Jedis;
public class Main {
public static void main(String[] args) throws Exception{
PubClient pubClient = new PubClient("127.0.0.1");
final String channel = "pubsub-channel222";
PubSubListener listener = new PubSubListener("client_one", new Jedis("127.0.0.1"));
SubClient subClient = new SubClient("127.0.0.1", listener);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
//在API級別,此處為輪詢操作,直到unsubscribe呼叫,才會返回
subClient.sub(channel);
}
});
t1.setDaemon(true);
t1.start();
int i = 0;
while(i < 2){
pubClient.publish(channel, "message"+i);
i++;
Thread.sleep(1000);
}
}
}