6、訊息釋出和訂閱功能
阿新 • • 發佈:2019-03-24
一:介紹
redis提供了簡單的釋出訂閱功能,producer往某個channel推送,client訂閱指定的channel(可以模糊匹配),這樣就能夠消費。
redis和rabbitmq的區別
-
可靠性
-
redis :沒有相應的機制保證訊息的可靠消費,如果釋出者釋出一條訊息,而沒有對應的訂閱者的話,這條訊息將丟失,不會存在記憶體中;
-
rabbitmq:具有訊息消費確認機制,如果釋出一條訊息,還沒有消費者消費該佇列,那麼這條訊息將一直存放在佇列中,直到有消費者消費了該條訊息,以此可以保證訊息的可靠消費,那麼rabbitmq的訊息是如何儲存的呢?(後續更新);
-
-
實時性
- redis:實時性高,redis作為高效的快取伺服器,所有資料都存在在伺服器中,所以它具有更高的實時性
-
消費者負載均衡:
-
rabbitmq佇列可以被多個消費者同時監控消費,但是每一條訊息只能被消費一次,由於rabbitmq的消費確認機制,因此它能夠根據消費者的消費能力而調整它的負載;
-
redis釋出訂閱模式,一個佇列可以被多個消費者同時訂閱,當有訊息到達時,會將該訊息依次傳送給每個訂閱者;
-
-
永續性
-
redis:redis的持久化是針對於整個redis快取的內容,它有RDB和AOF兩種持久化方式(redis持久化方式,後續更新),可以將整個redis例項持久化到磁碟,以此來做資料備份,防止異常情況下導致資料丟失。
-
rabbitmq:佇列,訊息都可以選擇性持久化,持久化粒度更小,更靈活;
-
-
佇列監控
-
rabbitmq實現了後臺監控平臺,可以在該平臺上看到所有建立的佇列的詳細情況,良好的後臺管理平臺可以方面我們更好的使用;
-
redis沒有所謂的監控平臺。
-
-
總結
- redis: 輕量級,低延遲,高併發,低可靠性;
- rabbitmq:重量級,高可靠,非同步,不保證實時;
- rabbitmq是一個專門的AMQP協議佇列,他的優勢就在於提供可靠的佇列服務,並且可做到非同步,而redis主要是用於快取的,redis的釋出訂閱模組,可用於實現及時性,且可靠性低的功能。
二:釋出/訂閱程式碼例子
-
1、pom依賴
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
-
2、publish ==> 只管往chennel傳送資料
public class Publisher { public static void main(String[] args) { JedisPool pool = new JedisPool("192.168.50.10", 6279); while (true) { Jedis jedis = null; try { jedis = pool.getResource(); // 就這一句關鍵推送 jedis.publish("channel1", "我是channel1 的msg"); jedis.publish("channel2", "我是channel2 的msg"); } catch (Exception e) { e.printStackTrace(); } finally { if (jedis != null) { jedis.close(); } } } } }
-
3、Subscriber ==> 定義一個訂閱者處理類
public class Subscriber extends JedisPubSub { public Subscriber() { } [@Override](https://my.oschina.net/u/1162528) public void onMessage(String channel, String message) { //收到訊息會呼叫 System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message)); } [@Override](https://my.oschina.net/u/1162528) public void onSubscribe(String channel, int subscribedChannels) { //訂閱了頻道會呼叫 System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", channel, subscribedChannels)); } [@Override](https://my.oschina.net/u/1162528) public void onUnsubscribe(String channel, int subscribedChannels) {//取消訂閱 會呼叫 System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", channel, subscribedChannels)); } }
-
4、jedis訂閱某個主題,並且呼叫Subscriber來處理
public class SubMain { public static void main(String[] args) { JedisPool jedisPool = new JedisPool("192.168.50.10", 6279); Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.subscribe(new Subscriber(), "channel1", "channel2"); jedis.psubscribe(new Subscriber(), "*cha*"); } catch (Exception e) { e.printStackTrace(); } finally { if (jedis != null) { jedis.close(); } } } }