1. 程式人生 > >jedis的publish/subscribe

jedis的publish/subscribe

首先使用redis客戶端來進行publish與subscribe的功能是否能夠正常執行。

 開啟redis伺服器

  1. [[email protected] ~]# redis-server /opt/redis-2.4.10/redis.conf  
  2. [7719] 16 Apr 11:37:22 # Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now. 
  3. [7719] 16 Apr 11:37:22 * Server started, Redis version 2.4.10 
  4. [7719] 16 Apr 11:37:22 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory
     = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect. 

 開啟一個客戶端訂閱一個news.sports的channel。

  1. [[email protected] ~]# redis-cli 
  2. redis 127.0.0.1:6379> subscribe news.sports 
  3. Reading messages... (press Ctrl-C to quit) 
  4. 1) "subscribe" 
  5. 2) "news.sports" 
  6. 3) (integer) 1 

 可以看到已經開始了監聽,向news.sports channel釋出一條訊息

  1. [[email protected] ~]# redis-cli             
  2. redis 127.0.0.1:6379> publish news.sports "kaka is back" 
  3. (integer) 1 

 訂閱的客戶端順利收到訊息

  1. redis 127.0.0.1:6379> subscribe news.sports 
  2. Reading messages... (press Ctrl-C to quit) 
  3. 1) "subscribe" 
  4. 2) "news.sports" 
  5. 3) (integer) 1 
  6. 1) "message" 
  7. 2) "news.sports" 
  8. 3) "kaka is back" 

 接下來使用jedis來進行釋出/訂閱的驗證

 釋出訊息是通過jedis.publish(String channel, String message)來發布的,其實就是往redis伺服器釋出一條publish命令。

  1. public void publish(final byte[] channel, final byte[] message) { 
  2.        sendCommand(PUBLISH, channel, message); 
  3.    } 

 訂閱訊息是通過jedis.subscribe(JedisPub pub,String channel)來進行的,channel好理解,那麼JedisPub是什麼呢。

 看原始碼吧。

 Jedis訂閱方法的原始碼為

  1. public void subscribe(JedisPubSub jedisPubSub, String... channels) { 
  2.       checkIsInMulti(); 
  3.       connect(); 
  4.       client.setTimeoutInfinite(); 
  5.       jedisPubSub.proceed(client, channels); 
  6.       client.rollbackTimeout(); 
  7.   } 

可以看到,主要是通過jedisPubSub.proceed(client, channels);來進行訂閱的。看proceed方法。

  1. public void proceed(Client client, String... channels) { 
  2.        this.client = client; 
  3.        client.subscribe(channels); 
  4.        client.flush(); 
  5.        process(client); 
  6.    } 

追蹤client.subscribe(channels)可以看到,

  1. public void subscribe(final byte[]... channels) { 
  2.        sendCommand(SUBSCRIBE, channels); 
  3.    } 

其只是向伺服器傳送了一個subcribe的命令而已。

那麼要了解jedisPubSub的作用,只能看process方法了。簡單看process其實是一個do...while迴圈

  1. private void process(Client client) { 
  2.        do { 
  3.        } while (isSubscribed()); 
  4.    } 

我們可以猜測正是靠著這個迴圈來不斷的讀取伺服器那邊傳到來的訂閱的訊息。

看主體

  1. List<Object>reply = client.getObjectMultiBulkReply(); 
  2.            final Object firstObj = reply.get(0); 
  3.            if (!(firstObj instanceof byte[])) { 
  4.                throw new JedisException("Unknown message type: " + firstObj); 
  5.            } 
  6.            final byte[] resp = (byte[]) firstObj; 
  7.            if (Arrays.equals(SUBSCRIBE.raw, resp)) { 
  8.                subscribedChannels = ((Long) reply.get(2)).intValue(); 
  9.                final byte[] bchannel = (byte[]) reply.get(1); 
  10.                final String strchannel = (bchannel == null) ? null 
  11.                        : SafeEncoder.encode(bchannel); 
  12.                onSubscribe(strchannel, subscribedChannels); 
  13.            } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { 
  14.                subscribedChannels = ((Long) reply.get(2)).intValue(); 
  15.                final byte[] bchannel = (byte[]) reply.get(1); 
  16.                final String strchannel = (bchannel == null) ? null 
  17.                        : SafeEncoder.encode(bchannel); 
  18.                onUnsubscribe(strchannel, subscribedChannels); 
  19.            } else if (Arrays.equals(MESSAGE.raw, resp)) { 
  20.                final byte[] bchannel = (byte[]) reply.get(1); 
  21.                final byte[] bmesg = (byte[]) reply.get(2); 
  22.                final String strchannel = (bchannel == null) ? null 
  23.                        : SafeEncoder.encode(bchannel); 
  24.                final String strmesg = (bmesg == null) ? null : SafeEncoder 
  25.                        .encode(bmesg); 
  26.                onMessage(strchannel, strmesg); 
  27.            } else if (Arrays.equals(PMESSAGE.raw, resp)) { 
  28.                final byte[] bpattern = (byte[]) reply.get(1); 
  29.                final byte[] bchannel = (byte[]) reply.get(2); 
  30.                final byte[] bmesg = (byte[]) reply.get(3); 
  31.                final String strpattern = (bpattern == null) ? null 
  32.                        : SafeEncoder.encode(bpattern); 
  33.                final String strchannel = (bchannel == null) ? null 
  34.                        : SafeEncoder.encode(bchannel); 
  35.                final String strmesg = (bmesg == null) ? null : SafeEncoder 
  36.                        .encode(bmesg); 
  37.                onPMessage(strpattern, strchannel, strmesg); 
  38.            } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { 
  39.                subscribedChannels = ((Long) reply.get(2)).intValue(); 
  40.                final byte[] bpattern = (byte[]) reply.get(1); 
  41.                final String strpattern = (bpattern == null) ? null 
  42.                        : SafeEncoder.encode(bpattern); 
  43.                onPSubscribe(strpattern, subscribedChannels); 
  44.            } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { 
  45.                subscribedChannels = ((Long) reply.get(2)).intValue(); 
  46.                final byte[] bpattern = (byte[]) reply.get(1); 
  47.                final String strpattern = (bpattern == null) ? null 
  48.                        : SafeEncoder.encode(bpattern); 
  49.                onPUnsubscribe(strpattern, subscribedChannels); 
  50.            } else { 
  51.                throw new JedisException("Unknown message type: " + firstObj); 
  52.            } 

可以看到,通過client.getObjectMultiBulkReply()來得到返回來的訊息。判斷訊息的型別來進行不同的操作。比如Arrays.equals(SUBSCRIBE.raw, resp)判斷返回來的訊息是訂閱,subscribedChannels = ((Long) reply.get(2)).intValue();是取得訊息,也是do...while判斷迴圈的條件,也就是說這一次如果讀到訊息了,則進行下一次迴圈。那麼onSubscribe(String channel, int subscribedChannels)究竟做了什麼事,看開頭

  1. public abstract void onMessage(String channel, String message); 
  2.    public abstract void onPMessage(String pattern, String channel, 
  3.            String message); 
  4.    public abstract void onSubscribe(String channel, int subscribedChannels); 
  5.    public abstract void onUnsubscribe(String channel, int subscribedChannels); 
  6.    public abstract void onPUnsubscribe(String pattern, int subscribedChannels); 
  7.    public abstract void onPSubscribe(String pattern, int subscribedChannels); 

可以看到這是xetorthio留給我們的方法。onSubscrible是訂閱時應該做些什麼,onMessage就是有訊息傳來是做些什麼,以此類推。

接下來可以寫一個方法來發布和訂閱訊息了。

  1. package redis.client.jredis.tests; 
  2. import java.util.Timer; 
  3. import java.util.TimerTask; 
  4. import org.junit.Test; 
  5. import redis.clients.jedis.Jedis; 
  6. import redis.clients.jedis.JedisPool; 
  7. import redis.clients.jedis.JedisPoolConfig; 
  8. import redis.clients.jedis.JedisPubSub; 
  9. publicclass JedisTest extends JedisTestBase { 
  10.     JedisPool pool = null
  11.     /** 
  12.      * 測試釋出驗證 
  13.      */
  14.     @Test
  15.     publicvoid testPS(){ 
  16.         /** 
  17.         Jedis jedis = new Jedis("192.168.5.146",6379); 
  18.         jedis.set("name", "xiaoruoen"); 
  19.         jedis.publish("news.blog.title", "Hello,World"); 
  20.         //*/
  21.         final String host = "192.168.5.146"
  22.         JedisPoolConfig config = new JedisPoolConfig(); 
  23.         pool = new JedisPool(new JedisPoolConfig(),host); 
  24.         subscribe(new NewsListener(), "news.sports"); 
  25.         Timer timer = new Timer(); 
  26.         timer.schedule(new TimerTask() { 
  27.             @Override
  28.             publicvoid run() { 
  29.                 // TODO Auto-generated method stub
  30.                 publish("news.sports""{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}"); 
  31.             } 
  32.         }, 10003000); 
  33.     } 
  34.     public Jedis getResource(int dbnum){ 
  35.         Jedis jedis = pool.getResource(); 
  36.         jedis.select(dbnum); 
  37.         return jedis; 
  38.     } 
  39.     /** 
  40.      *  
  41.      * @param channel 
  42.      * @param message ex:"{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}" 
  43.      */
  44.     publicvoid publish(String channel,String message){ 
  45.         Jedis jedis = getResource(12); 
  46.         jedis.publish(channel, message); 
  47.         pool.returnResource(jedis); 
  48.     } 
  49.     publicvoid subscribe(JedisPubSub listener,String channel){ 
  50.         Jedis jedis = getResource(12); 
  51.         jedis.subscribe(listener, channel); 
  52.         pool.returnResource(jedis); 
  53.     } 
  1. package redis.client.jredis.tests; 
  2. import redis.clients.jedis.JedisPubSub; 
  3. publicclass NewsListener extends JedisPubSub { 
  4.     @Override
  5.     publicvoid onMessage(String channel, String message) { 
  6.         System.out.println("get message from"+channel+"   "+message); 
  7.     } 
  8.     @Override
  9.     publicvoid onPMessage(String pattern, String channel, String message) { 
  10.         System.out.println("get message from"+channel+"   "+message); 
  11.     } 
  12.     @Override
  13.     publicvoid onSubscribe(String channel, int subscribedChannels) { 
  14.         System.out.println("subscribe the channel:"+channel); 
  15.     } 
  16.     @Override
  17.     publicvoid onUnsubscribe(String channel, int subscribedChannels) { 
  18.         System.out.println("get message from"+channel); 
  19.     } 
  20.     @Override
  21.     publicvoid onPUnsubscribe(String pattern, int subscribedChannels) { 
  22.         System.out.println("get message from"+subscribedChannels); 
  23.     } 
  24.     @Override
  25.     publicvoid onPSubscribe(String pattern, int subscribedChannels) { 
  26.         System.out.println("get message from"+subscribedChannels); 
  27.     } 

發現只打印了一條資料subscribe the channel:news.sports

沒按我們所期望的那樣那所有釋出的訊息都打印出來。

到官網檢視

看到Note that subscribe is a blocking operation operation because it will poll Redis for responses on the thread that calls subscribe.可以看到subcribe是一個執行緒中的塊操作。我猜測是在釋出與接收的過程中,如果在同一執行緒裡面進行操作,一邊阻塞著流,另一邊無法進行操作。於是將publish改寫為另一執行緒啟動。修改如下:

  1. publicstaticvoid main(String[] args){ 
  2.         final String host = "192.168.5.146"
  3.         JedisPoolConfig config = new JedisPoolConfig(); 
  4.         pool = new JedisPool(new JedisPoolConfig(),host); 
  5.         Thread thread = new Thread(new Test().new PublishThread()); 
  6.         thread.start(); 
  7.         subscribe(new NewsListener(), "news.sports"); 
  1. class PublishThread implements Runnable{ 
  2.         @Override 
  3.         public void run() { 
  4.             Timer timer = new Timer(); 
  5.             timer.schedule(new TimerTask() { 
  6.                 @Override 
  7.                 public void run() { 
  8.                     // TODO Auto-generated method stub 
  9.                     publish("news.sports", "{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}"); 
  10.                 } 
  11.             }, 1000, 3000); 
  12.         }  
  13.     } 

最終釋出訂閱成功。

  1. Pool:[email protected] 
  2. subscribe the channel:news.sports 
  3. Pool:[email protected] 
  4. get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
  5. Pool:[email protected] 
  6. get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
  7. Pool:[email protected] 
  8. get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
  9. Pool:[email protected] 
  10. get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
  11. Pool:[email protected] 

相關推薦

RabbitMQ系列教程之三:發布/訂閱(Publish/Subscribe

mqc 標題 整合 參數 cti 事情 return 控制臺 run (本教程是使用Net客戶端,也就是針對微軟技術平臺的) 在前一個教程中,我們創建了一個工作隊列。工作隊列背後的假設是每個任務會被交付給一個【工人】。在這一部分我們將做一些完全不同的事情--我們將向多個

mqtt協議-broker之moqutte源碼研究二之SUBSCRIBE報文處理

mqtt moquette broker 源碼 這一篇開始講解moqutte對SUBSCRIBE報文的處理 代碼不復雜public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {String clientID

只註冊subscribe

style div false 中心 應用 str AS pre dubbo 如果有兩個鏡像環境,兩個註冊中心,有一個服務只在其中一個註冊中心有部署,另一個註冊中心還沒來得及部署,而兩個註冊中心的其它應用都需要依賴此服務。這個時候,可以讓服務提供者方只註冊服務到另一註冊中心

RabbitMQJava系列4-Publish/Subscribe

receive 發送 over ring prop get 不定 time lose 發布訂閱模式 X:交換機(轉發器) 生產者把消息發送到交換機,交換機把消息發送到隊列中,隊列需要綁定到交換機。 1,一個生產者,多個消費者 2,每個消費者都有自己的隊列 Java代碼實現

Akka源碼分析-Cluster-Distributed Publish Subscribe in Cluster

lin doc elong ror con div 隨機 ali 進行   在ClusterClient源碼分析中,我們知道,他是依托於“Distributed Publish Subscribe in Cluster”來實現消息的轉發的,那本文就來分析一下Pub/Sub是

RabbitMQ學習第三記:發布/訂閱模式(Publish/Subscribe

font image 直接 email err spl 回調方法 byte []   工作隊列模式是直接在生產者與消費者裏聲明好一個隊列,這種情況下消息只會對應同類型的消費者。   舉個用戶註冊的列子:用戶在註冊完後一般都會發送消息通知用戶註冊成功(失敗)。如果在一個系統中

php redis pub/sub(Publish/Subscribe,發布/訂閱的信息系統)之基本使用

終端 sage 命令 -c ring 腳本 端口 ack 端口號 一.場景介紹 最近的一個項目需要用到發布/訂閱的信息系統,以做到最新實時消息的通知。經查找後發現了redis pub/sub(發布/訂閱的信息系統)可以滿足我的開發需求,而且學習成本和使用成本也比較低。 二

angular的subscribe

  angular中可以使用observable和subscribe實現訂閱,從而實現非同步。   這裡記錄一個工作中的小問題,以加深對subscribe的理解。前端技能弱,慢慢積累中。   本來希望的是點選一個按鈕後出現一個loading的模態框,實際發現並沒有出現loading的模態框。     

python採用pika庫使用rabbitmq(七)Publish\Subscribe(訊息釋出\訂閱)

之前的例子都基本都是1對1的訊息傳送和接收,即訊息只能傳送到指定的queue裡,但有些時候你想讓你的訊息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了, Exchange在定義的時候是有型別的,以決定到底是哪些Queue符合條件,可以接收訊息 fanout: 所有bin

譯: 3. RabbitMQ Spring AMQP 之 Publish/Subscribe 釋出和訂閱

在第一篇教程中,我們展示瞭如何使用start.spring.io來利用Spring Initializr建立一個具有RabbitMQ starter dependency的專案來建立spring-amqp應用程式。 在上一個教程中,我們建立了一個新的包(tut2)來放置我們的配置,傳送者

Rxjava2的lint提示:The result of subscribe is not used的分析與解決

隨著Android Studio的進一步升級,其Lint能力愈加強大,比如現在專案中生成一個Obervable之後,若沒subscribe給Observer,而僅僅是給了幾個Consumer,則會被醒目的提示程式碼有問題,如下圖: 程式碼提示主要是The result of s

RabbitMQ 之 訂閱模式 Publish/Subscribe

模型圖 我們之前學習的都是一個訊息只能被一個消費者消費,那麼如果我想發一個訊息 能被多個消費者消費,這時候怎麼辦? 這時候我們就得用到了訊息中的釋出訂閱模型 在前面的教程中,我們建立了一個工作佇列,都是一個任務只交給一個消費者。這次我們做 將訊息傳送給多個消費者。這種模式叫做“釋出/訂閱”。 舉列:

KafkaConsumer assign VS subscribe

背景 在kafka中,正常情況下,同一個group.id下的不同消費者不會消費同樣的partition,也即某個partition在任何時刻都只能被具有相同group.id的consumer中的一個消費。 也正是這個機制才能保證kafka的重要特性: 1、可以通過增加partitions和consu

九 assign和subscribe

1 subscribe:  自動安排分割槽, 通過group自動重新的負載均衡;   關於Group的實驗: 如果auto commit = true, 重新啟動程序,如果是同樣的groupID,從上次commit的地方開始消費資料,但是如果換了group後,就

九 assign和subscribe

函數 false rec 現象 不支持 l命令 produce 服務端 col 1 subscribe: 自動安排分區, 通過group自動重新的負載均衡; 關於Group的實驗: 如果auto commit = true, 重新啟動進程,如果是同樣的groupID

ROS subscribe回撥函式的多引數使用

ROS 的subscribe函式,它在api中的原型為 template<class M , class T > Subscriber ros::NodeHandle::subscribe(const std::string & topic

Image subscribe and callback error

Scanning dependencies of target image_listen_node [ 62%] Built target imu_subscribe_node [ 50%] Building CXX object image_listen/CM

統一客服訊息返回錯誤:{"errcode":43004,"errmsg":"require subscribe hint: [9Vv08633952]"}

公眾號或者小程式傳送客服訊息錯誤: {"errcode":43004,"errmsg":"require subscribe hint: [9Vv08633952]"} 場景:小程式使用公眾號的服務訊息,推送訊息,如果接收人沒有關注公眾號,就會出現以上錯誤。 原因: require subscribe錯

10.13以太坊Solidity智慧合約彙編整合開發2和web3.eth.subscribe詳解

--1-- 一、獨立組裝 上面描述為內聯彙編的組合語言也可以單獨使用,實際上,計劃是將它用作Solidity編譯器的中間語言。在這種形式下,它試圖實現幾個目標:

RabbitMQ Publish/Subscribe for Java【入門教程 3】

 首先通過上面兩個入門教程我們引入本文: Publish/Subscribe:在上一章中,我們學習建立了一個訊息佇列,她的每個任務訊息只發送給一個佇列,然後佇列的資訊由消費者各自消費。這一章,我們會將同一個任務訊息傳送給多個佇列。這種模式就是“釋出/訂閱”。為了將訊息傳送