1. 程式人生 > >Redis用作訊息佇列

Redis用作訊息佇列

 Redis不僅可作為快取伺服器,還可用作訊息佇列。它的列表型別天生支援用作訊息佇列。如下圖所示:

    由於Redis的列表是使用雙向連結串列實現的,儲存了頭尾節點,所以在列表頭尾兩邊插取元素都是非常快的。

    所以可以直接使用Redis的List實現訊息佇列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。簡單示例如下:

存放訊息端(訊息生產者):

 

 
  1.  

訊息處理端(訊息消費者):

 

 
  1. package org.yamikaze.redis.messsage.queue;

  2.  
  3. import org.yamikaze.redis.test.MyJedisFactory;

  4. import redis.clients.jedis.Jedis;

  5.  
  6. /**

  7. * 訊息消費者

  8. * @author yamikaze

  9. */

  10. public class Customer extends Thread{

  11.  
  12. private String customerName;

  13. private volatile int count;

  14. private Jedis jedis;

  15.  
  16. public Customer(String name) {

  17. this.customerName = name;

  18. init();

  19. }

  20.  
  21. private void init() {

  22. jedis = MyJedisFactory.getLocalJedis();

  23. }

  24.  
  25. public void processMessage() {

  26. String message = jedis.rpop(Producer.MESSAGE_KEY);

  27. if(message != null) {

  28. count++;

  29. handle(message);

  30. }

  31. }

  32.  
  33. public void handle(String message) {

  34. System.out.println(customerName + " 正在處理訊息,訊息內容是: " + message + " 這是第" + count + "條");

  35. }

  36.  
  37. @Override

  38. public void run() {

  39. while (true) {

  40. processMessage();

  41. }

  42. }

  43.  
  44. public static void main(String[] args) {

  45. Customer customer = new Customer("yamikaze");

  46. customer.start();

  47. }

  48. }

   但上述例子中訊息消費者有一個問題存在,即需要不停的呼叫rpop方法檢視List中是否有待處理訊息。每呼叫一次都會發起一次連線,這會造成不必要的浪費。也許你會使用Thread.sleep()等方法讓消費者執行緒隔一段時間再消費,但這樣做有兩個問題:

 

    1)、如果生產者速度大於消費者消費速度,訊息佇列長度會一直增大,時間久了會佔用大量記憶體空間。

    2)、如果睡眠時間過長,這樣不能處理一些時效性的訊息,睡眠時間過短,也會在連線上造成比較大的開銷。

    所以可以使用brpop指令,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,於是消費端可以將processMessage可以改為這樣:

 

 
  1. public void processMessage() {

  2. /**

  3. * brpop支援多個列表(佇列)

  4. * brpop指令是支援佇列優先順序的,比如這個例子中MESSAGE_KEY的優先順序大於testKey(順序決定)。

  5. * 如果兩個列表中都有元素,會優先返回優先順序高的列表中的元素,所以這兒優先返回MESSAGE_KEY

  6. * 0表示不限制等待,會一直阻塞在這兒

  7. */

  8. List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");

  9. if(messages.size() != 0) {

  10. //由於該指令可以監聽多個Key,所以返回的是一個列表

  11. //列表由2項組成,1) 列表名,2)資料

  12. String keyName = messages.get(0);

  13. //如果返回的是MESSAGE_KEY的訊息

  14. if(Producer.MESSAGE_KEY.equals(keyName)) {

  15. String message = messages.get(1);

  16. handle(message);

  17. }

  18.  
  19. }

  20. System.out.println("=======================");

  21. }

    然後可以執行Customer,清空控制檯,可以看到程式沒有任何輸出,阻塞在了brpop這兒。然後在開啟Redis的客戶端,輸入指令client list,可以檢視當前有兩個連線。

 

釋出/訂閱模式

    Redis除了對訊息佇列提供支援外,還提供了一組命令用於支援釋出/訂閱模式。

    

    1)釋出

    PUBLISH指令可用於釋出一條訊息,格式 PUBLISH channel message

    返回值表示訂閱了該訊息的數量。

    2)訂閱

    SUBSCRIBE指令用於接收一條訊息,格式 SUBSCRIBE channel

    可以看到使用SUBSCRIBE指令後進入了訂閱模式,但沒有接收到publish傳送的訊息,這是因為只有在訊息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回覆。回覆分為三種類型:

    1、如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是第幾個訂閱的頻道?(理解成序號?) 

    2、如果為message(訊息),第二個值為產生該訊息的頻道,第三個值為訊息

    3、如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示當前客戶端的訂閱數量。

    可以使用指令UNSUBSCRIBE退訂,如果不加引數,則會退訂所有由SUBSCRIBE指令訂閱的頻道。

   

    Redis還支援基於萬用字元的訊息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:

   再試試推送訊息會得到以下結果:

   可以看到publish指令返回的是2,而訂閱端這邊接收了兩次訊息。這是因為PSUBSCRIBE指令可以重複訂閱頻道。而使用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。同時PUNSUBSCRIBE指令萬用字元不會展開。

例如:PUNSUBSCRIBE * 不會匹配到 channel.*, 所以要取消訂閱channel.*就要這樣寫PUBSUBSCRIBE channel.*。

 

程式碼示範如下:

 
  1. package org.yamikaze.redis.messsage.subscribe;

  2.  
  3. import org.yamikaze.redis.messsage.queue.StringUtils;

  4. import org.yamikaze.redis.test.MyJedisFactory;

  5. import redis.clients.jedis.Jedis;

  6.  
  7. /**

  8. * 訊息釋出方

  9. * @author yamikaze

  10. */

  11. public class Publisher {

  12.  
  13. public static final String CHANNEL_KEY = "channel:message";

  14. private Jedis jedis;

  15.  
  16. public Publisher() {

  17. jedis = MyJedisFactory.getLocalJedis();

  18. }

  19.  
  20. public void publishMessage(String message) {

  21. if(StringUtils.isBlank(message)) {

  22. return;

  23. }

  24. jedis.publish(CHANNEL_KEY, message);

  25. }

  26.  
  27. public static void main(String[] args) {

  28. Publisher publisher = new Publisher();

  29. publisher.publishMessage("Hello Redis!");

  30. }

  31. }

    簡單的傳送一個訊息。

訊息訂閱方:

 
  1. package org.yamikaze.redis.messsage.subscribe;

  2.  
  3. import org.yamikaze.redis.test.MyJedisFactory;

  4. import redis.clients.jedis.Jedis;

  5. import redis.clients.jedis.JedisPubSub;

  6.  
  7. import java.util.concurrent.TimeUnit;

  8.  
  9. /**

  10. * 訊息訂閱方客戶端

  11. * @author yamikaze

  12. */

  13. public class SubscribeClient {

  14.  
  15. private Jedis jedis;

  16. private static final String EXIT_COMMAND = "exit";

  17.  
  18. public SubscribeClient() {

  19. jedis = MyJedisFactory.getLocalJedis();

  20. }

  21.  
  22. public void subscribe(String ...channel) {

  23. if(channel == null || channel.length <= 0) {

  24. return;

  25. }

  26. //訊息處理,接收到訊息時如何處理

  27. JedisPubSub jps = new JedisPubSub() {

  28. /**

  29. * JedisPubSub類是一個沒有抽象方法的抽象類,裡面方法都是一些空實現

  30. * 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage

  31. * 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法

  32. * 當然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法引數為byte[]

  33. */

  34. @Override

  35. public void onMessage(String channel, String message) {

  36. if(Publisher.CHANNEL_KEY.equals(channel)) {

  37. System.out.println("接收到訊息: channel : " + message);

  38. //接收到exit訊息後退出

  39. if(EXIT_COMMAND.equals(message)) {

  40. System.exit(0);

  41. }

  42.  
  43. }

  44. }

  45.  
  46. /**

  47. * 訂閱時

  48. */

  49. @Override

  50. public void onSubscribe(String channel, int subscribedChannels) {

  51. if(Publisher.CHANNEL_KEY.equals(channel)) {

  52. System.out.println("訂閱了頻道:" + channel);

  53. }

  54. }

  55. };

  56. //可以訂閱多個頻道 當前執行緒會阻塞在這兒

  57. jedis.subscribe(jps, channel);

  58. }

  59.  
  60. public static void main(String[] args) {

  61. SubscribeClient client = new SubscribeClient();

  62. client.subscribe(Publisher.CHANNEL_KEY);

  63. //並沒有 unsubscribe方法

  64. //相應的也沒有punsubscribe方法

  65. }

  66. }

    先執行client,再執行Publisher進行訊息傳送,輸出結果:



總結:

    使用Redis的List資料結構可以簡單迅速地做一個訊息佇列,同時Redis提供的BRPOP和BLPOP等指令解決了頻繁呼叫Jedis的rpop和lpop方法造成的資源浪費問題。除此之外,Redis提供對釋出/訂閱模式的指令,可以實現訊息傳遞、程序間通訊。