1. 程式人生 > >rabbitMQ學習筆記(4):pub/sub

rabbitMQ學習筆記(4):pub/sub

   在實際的rabbitMQ使用中,大部分場景都比較複雜,沒有像上一篇學習筆記裡介紹的單個consumer那麼簡單。通常會將一個訊息deliver到多個consumer。這就是rabbitMQ實現的pub/sub模式,也就是釋出/訂閱模式。

        為了闡述這個模式,我們實現一個簡單的日誌系統來演示pub/sub。系統的功能十分簡單,一個生產者向rabbitMQ寫入log,兩個消費者監聽rabbitMQ獲得log資訊,其中一個消費者將log列印到控制檯上,另一個將log列印到磁碟上。在展示程式碼之前我們需要理解一些基本概念。

exchanges

    rabbitMQ中所謂的交換機。在前一篇筆記中我們是通過在佇列(queue)中傳送和接收訊息,來實現producer和consumer的解耦,我們簡單快速的回顧一下前面介紹的rabbitMQ訊息模型: 1、生產者是一個傳送訊息的應用 2、佇列作為訊息的緩衝和儲存 3、消費者從佇列中接收訊息並處理      然而這個模型過於“simple”和粗粒度了,rabbitMQ的訊息模型的核心思想是生產者和消費者的完全解耦。具體而言,就是生產者不會將訊息傳送到一個具體的佇列,甚至大多是情況下生產者都不知道訊息是否被髮送到了佇列中。那麼rabbitMQ又是如何將訊息傳送到佇列中的呢? Producer傳送的Message實際上是發到了Exchange中。它的功能也很簡單:從Producer接收Message,然後投遞到queue中。
Exchange需要知道如何處理Message,是把它放到那個queue中,還是放到多個queue中?這個rule是通過Exchange 的型別定義的。                    
exchange的型別定義有direct、topic、headers和fanout這幾種,最後一種fanout就是廣播,exchange會將收到的訊息傳送到所有的佇列中。建立一個fanout的exchange,程式碼如下:
channel.exchangeDeclare("logs", "fanout");
有了exchange之後,生產者就可以向exchange傳送訊息:
channel.basicPublish( "logs", "", null, message.getBytes());

臨時佇列

截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。


    但是對於我們將要構建的日誌系統,並不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們只對當前的log感興趣。為了實現這個目標,我們需要兩件事情:
    1) 每當Consumer連線時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在宣告queue時不指定名字,那麼RabbitMQ會隨機為我們選擇這個名字。
   2)當consumer斷開和佇列的連線時佇列應該被自動刪除。 在rabbitMQ java api裡,我們可以宣告一個臨時的、可被自動刪除的佇列:
String queueName = channel.queueDeclare().getQueue();

這裡的queueName是一個隨機產生的佇列名,類似amq.gen-JzTY20BRgKO-HjmUJj0wLg.這樣的名字

bindings

       
上面我們已經建立了一個廣播exchange和一個佇列,現在我們需要告訴exchange向佇列中傳送訊息,binding就是佇列和交換機之間的關係。
channel.queueBind(queueName, "logs", "");
binding完成後exchange 就會向佇列中傳送訊息了。

最終版本

           
Producer,在這裡就是產生log的program,基本上和前幾個都差不多。最主要的區別就是publish通過了exchange而不是routing_key。
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}
還有一點要注意的是我們聲明瞭exchange。publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,log是被丟棄的。
consumer:
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

另一個consumer只要把輸出到螢幕上的那行程式碼改成輸出到檔案,其他並無不同,這裡就不貼出來了。