Redis: Jedis中publish/subscribe 使用
在Redis早期版本就已經提供publish/subscribe 模式,該文使用Jedis客戶端的一個小例子.
Jedis 類中提供:
在Jedis中提供 釋出二進位制編碼 ,string字串 以及pattern匹配模式三種方式來發布publish訊息.
public Long publish(final String channel, final String message);
public Long publish(byte[] channel, byte[] message);
public List<String> pubsubChannels(String pattern) ;
同時提供 二進位制編碼和string字串來訂閱訊息.
public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels)
public void subscribe(final JedisPubSub jedisPubSub, final String... channels)
在訂閱訊息中涉及到2個重要類.BinaryJedisPubSub 和JedisPubSub 類,這2個類用來處理收到訊息時,對訊息的邏輯處理.
public abstract class JedisPubSub {
}
public abstract class BinaryJedisPubSub {
}
這兩個類為抽象類必須通過使用者來實現該類. 這兩個類中分別有重要的方法onMessage 當收到訊息時需要處理.
public void onMessage(byte[] channel, byte[] message) {
}
public void onMessage(String channel, String message) {
}
PublishMsg.Java 釋出訊息端:
- Jedis jedis = new Jedis("localhost");
- //釋出Protocol Buffer 協議訊息
-
Builder builder = UserBean.newBuilder();
- builder.setId(1000);
- UserBean userbean = builder.build();
- ByteArrayOutputStream output =new ByteArrayOutputStream();
- userbean.writeTo(output);
- long loop=0;
- while (loop++<10000) {
- //釋出userbean 二進位制訊息
- jedis.publish("userbean".getBytes(), output.toByteArray());
- Thread.sleep(1000);
- }
- jedis.disconnect();
SubscribeMsg.java 訂閱訊息端:
- Jedis jedis = new Jedis("localhost");
- //業務邏輯處理
- UserBeanListener l =new UserBeanListener();
- //訂閱userbean二進位制訊息
- jedis.subscribe(l, "userbean".getBytes());
- long loop=0;
- while (loop++<10000)
- {
- Thread.sleep(1000);
- }
- jedis.disconnect();
UserBeanListener.java 業務訊息處理:
- publicclass UserBeanListener extends BinaryJedisPubSub {
- @Override
- publicvoid onMessage(byte[] channel, byte[] message) {
- try {
- UserBean u = UserBean.parseFrom(message);
- System.out.println(u.getId());
- } catch (InvalidProtocolBufferException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
UserMsg.proto ProtocolBuffer協議檔案:
- message UserBean{
- // ID(必需)
- required int32 id = 1;
- }