1. 程式人生 > >Redis: Jedis中publish/subscribe 使用

Redis: Jedis中publish/subscribe 使用

在Redis早期版本就已經提供publish/subscribe 模式,該文使用Jedis客戶端的一個小例子.

Jedis 類中提供:

在Jedis中提供 釋出二進位制編碼 ,string字串 以及pattern匹配模式三種方式來發布publish訊息.

public Long publish(final String channel, final String message);

public Long publish(byte[] channel, byte[] message);

public List<String> pubsubChannels(String pattern) ;

同時提供 二進位制編碼和string字串來訂閱訊息.

public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels)

public void subscribe(final JedisPubSub jedisPubSub, final String... channels)

在訂閱訊息中涉及到2個重要類.BinaryJedisPubSub 和JedisPubSub 類,這2個類用來處理收到訊息時,對訊息的邏輯處理.

public abstract class JedisPubSub {

}

public abstract class BinaryJedisPubSub {

}

這兩個類為抽象類必須通過使用者來實現該類. 這兩個類中分別有重要的方法onMessage 當收到訊息時需要處理.

public void onMessage(byte[] channel, byte[] message) {
  }

  public void onMessage(String channel, String message) {
  }

PublishMsg.Java  釋出訊息端:

  1.    Jedis jedis = new Jedis("localhost");  
  2.    //釋出Protocol Buffer 協議訊息
  3.    Builder builder = UserBean.newBuilder();  
  4.    builder.setId(1000);  
  5.    UserBean userbean = builder.build();  
  6.    ByteArrayOutputStream output =new ByteArrayOutputStream();  
  7.    userbean.writeTo(output);  
  8.    long loop=0;  
  9. while (loop++<10000) {  
  10.     //釋出userbean 二進位制訊息
  11.     jedis.publish("userbean".getBytes(), output.toByteArray());  
  12.     Thread.sleep(1000);  
  13. }   
  14.    jedis.disconnect();    


SubscribeMsg.java 訂閱訊息端:

  1.    Jedis jedis = new Jedis("localhost");  
  2.       //業務邏輯處理
  3.    UserBeanListener l =new UserBeanListener();  
  4.    //訂閱userbean二進位制訊息
  5.    jedis.subscribe(l, "userbean".getBytes());  
  6.    long loop=0;  
  7. while (loop++<10000)  
  8. {  
  9.     Thread.sleep(1000);  
  10. }   
  11.    jedis.disconnect();    


UserBeanListener.java 業務訊息處理:

  1. publicclass UserBeanListener extends BinaryJedisPubSub {  
  2.     @Override
  3.     publicvoid onMessage(byte[] channel, byte[] message) {  
  4.         try {  
  5.             UserBean u = UserBean.parseFrom(message);  
  6.             System.out.println(u.getId());  
  7.         } catch (InvalidProtocolBufferException e) {  
  8.             // TODO Auto-generated catch block
  9.             e.printStackTrace();  
  10.         }  
  11.     }  
  12. }  


UserMsg.proto  ProtocolBuffer協議檔案:

  1. message UserBean{  
  2.     // ID(必需)
  3.     required int32 id = 1;  
  4. }