1. 程式人生 > >使用Redis的Pub/Sub來實現類似於JMS的訊息持久化

使用Redis的Pub/Sub來實現類似於JMS的訊息持久化

關於個人對Redis提供的Pub/Sub機制的認識在上一篇部落格中涉及到了,也提到了關於如何避免Redis的Pub/Sub的一個最大的缺陷的思路—訊息的持久化(http://blog.csdn.net/canot/article/details/51975566)。這篇文章主要是關於其思路(Redis的Pub/Sub的訊息持久化)的程式碼實現:

Pub/Sub機制中最核心的Listener的實現:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.util.RedisInputStream;

public
class PubSubListener extends JedisPubSub { private String clientId; private HandlerRedis handlerRedis; // 生成PubSubListener的時候必須制定一個id public PubSubListener(String cliendId, Jedis jedis) { this.clientId = cliendId; jedis.auth("xxxx"); handlerRedis = new HandlerRedis(jedis); } @Override
public void onMessage(String channel, String message) { if ("quit".equals(message)) { this.unsubscribe(channel); } handlerRedis.handler(channel, message); } // 真正處理接受的地方 private void message(String channel, String message) { System.out.println("message receive:"
+ message + ",channel:" + channel + "..."); } @Override public void onPMessage(String pattern, String channel, String message) { // TODO Auto-generated method stub } @Override public void onSubscribe(String channel, int subscribedChannels) { // 將訂閱者儲存在一個"訂閱活躍者集合中" handlerRedis.subscribe(channel); System.out.println("subscribe:" + channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { handlerRedis.ubsubscribe(channel); System.out.println("unsubscribe:" + channel); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onPSubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub } class HandlerRedis { private Jedis jedis; public HandlerRedis(Jedis jedis) { this.jedis = jedis; } public void handler(String channel, String message) { int index = message.indexOf("/"); if (index < 0) { // 訊息不合法,丟棄 return; } Long txid = Long.valueOf(message.substring(0, index)); String key = clientId + "/" + channel; while (true) { String lm = jedis.lindex(key, 0);// 獲取第一個訊息 if (lm == null) { break; } int li = lm.indexOf("/"); // 如果訊息不合法,刪除並處理 if (li < 0) { String result = jedis.lpop(key);// 刪除當前message // 為空 if (result == null) { break; } message(channel, lm); continue; } Long lxid = Long.valueOf(lm.substring(0, li));// 獲取訊息的txid // 直接消費txid之前的殘留訊息 if (txid >= lxid) { jedis.lpop(key);// 刪除當前message message(channel, lm); continue; } else { break; } } } // 持久化訂閱操作 public void subscribe(String channel) { // 保證在訂閱者集合中的格式為 唯一識別符號/訂閱的通道 String key = clientId + "/" + channel; // 判斷該客戶端是否在集合中存在 boolean isExist = jedis.sismember("PERSIS_SUB", key); if (!isExist) { // 不存在則新增 jedis.sadd("PERSIS_SUB", key); } } public void ubsubscribe(String channel) { String key = clientId + "/" + channel; // 從“活躍訂閱者”集合中 jedis.srem("PERSIS_SUB", key); // 刪除“訂閱者訊息佇列” jedis.del(channel); } } }

Listener中定義了一個內部類HandlerRedis。Listener類將onMessage以及onSubscribe兩個方法交付於HandlerRedis。Handler處理這個方法的時候也即進行著佇列的維護。Listener類中定義了一個message()方法,該方法是handler的回撥方法,即真正的處理訊息的地方。

通道的訂閱客戶端類:

import redis.clients.jedis.Jedis;

public class SubClient {
   private Jedis jedis;
   private PubSubListener listener;

   public SubClient(String host,PubSubListener pubSubListener){
       jedis = new Jedis(host);
       jedis.auth("XXXXX");
       this.listener = pubSubListener;
   }

   public void sub(String channel){
        jedis.subscribe(listener, channel);
    }

    public void unsubscribe(String channel){
        listener.unsubscribe(channel);
    }

}

通道的訊息釋出的客戶端:

import java.util.Set;

import redis.clients.jedis.Jedis;

public class PubClient {
    private Jedis jedis;

    public PubClient(String host) {
        jedis = new Jedis(host);
        jedis.auth("wx950709");
    }

    /**
     * 釋出的每條訊息,都需要在“訂閱者訊息佇列”中持久
     * 
     * @param message
     */
    public void put(String message) {
        //獲取所有活躍的訊息接收者客戶端 clientID/channel
        Set<String> subClients = jedis.smembers("PERSIS_SUB");
        for (String subs : subClients) {
            // 儲存每個客戶端的訊息
            jedis.rpush(subs, message);
        }
    }

    public void publish(String channel, String message) {
        // 每個訊息,都有具有一個全域性唯一的id
        // txid為了防止訂閱端在資料處理時“亂序”,這就要求訂閱者需要解析message
        Long txid = jedis.incr("MESSAGE_TXID");
        String content = txid + "/" + message;
        this.put(content);
        jedis.publish(channel, content);//為每個訊息設定id,最終訊息格式1000/messageContent
    }
    public void close(String channel){
        jedis.publish(channel, "quit");
        jedis.del(channel);//刪除
    }
}

測試引導類:

import redis.clients.jedis.Jedis;

public class Main {
    public static void main(String[] args) throws Exception{
        PubClient pubClient = new PubClient("127.0.0.1");
        final String channel = "pubsub-channel222";
        PubSubListener listener = new PubSubListener("client_one", new Jedis("127.0.0.1"));
        SubClient subClient = new SubClient("127.0.0.1", listener);
        Thread t1 = new Thread(new Runnable() {

            @Override
            public void run() {
                //在API級別,此處為輪詢操作,直到unsubscribe呼叫,才會返回
                subClient.sub(channel); 
            }

        });
        t1.setDaemon(true);
        t1.start();

        int i = 0;
        while(i < 2){
            pubClient.publish(channel, "message"+i);
            i++;
            Thread.sleep(1000);
        }
    }
}