rabbitMQ學習筆記(4):pub/sub
阿新 • • 發佈:2019-02-04
在實際的rabbitMQ使用中,大部分場景都比較複雜,沒有像上一篇學習筆記裡介紹的單個consumer那麼簡單。通常會將一個訊息deliver到多個consumer。這就是rabbitMQ實現的pub/sub模式,也就是釋出/訂閱模式。
為了闡述這個模式,我們實現一個簡單的日誌系統來演示pub/sub。系統的功能十分簡單,一個生產者向rabbitMQ寫入log,兩個消費者監聽rabbitMQ獲得log資訊,其中一個消費者將log列印到控制檯上,另一個將log列印到磁碟上。在展示程式碼之前我們需要理解一些基本概念。
exchanges
rabbitMQ中所謂的交換機。在前一篇筆記中我們是通過在佇列(queue)中傳送和接收訊息,來實現producer和consumer的解耦,我們簡單快速的回顧一下前面介紹的rabbitMQ訊息模型: 1、生產者是一個傳送訊息的應用 2、佇列作為訊息的緩衝和儲存 3、消費者從佇列中接收訊息並處理 然而這個模型過於“simple”和粗粒度了,rabbitMQ的訊息模型的核心思想是生產者和消費者的完全解耦。具體而言,就是生產者不會將訊息傳送到一個具體的佇列,甚至大多是情況下生產者都不知道訊息是否被髮送到了佇列中。那麼rabbitMQ又是如何將訊息傳送到佇列中的呢? Producer傳送的Message實際上是發到了Exchange中。它的功能也很簡單:從Producer接收Message,然後投遞到queue中。exchange的型別定義有direct、topic、headers和fanout這幾種,最後一種fanout就是廣播,exchange會將收到的訊息傳送到所有的佇列中。建立一個fanout的exchange,程式碼如下:
channel.exchangeDeclare("logs", "fanout");
有了exchange之後,生產者就可以向exchange傳送訊息:
channel.basicPublish( "logs", "", null, message.getBytes());
臨時佇列
截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。但是對於我們將要構建的日誌系統,並不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們只對當前的log感興趣。為了實現這個目標,我們需要兩件事情:
1) 每當Consumer連線時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在宣告queue時不指定名字,那麼RabbitMQ會隨機為我們選擇這個名字。
2)當consumer斷開和佇列的連線時佇列應該被自動刪除。 在rabbitMQ java api裡,我們可以宣告一個臨時的、可被自動刪除的佇列:
String queueName = channel.queueDeclare().getQueue();
這裡的queueName是一個隨機產生的佇列名,類似amq.gen-JzTY20BRgKO-HjmUJj0wLg.這樣的名字
bindings
上面我們已經建立了一個廣播exchange和一個佇列,現在我們需要告訴exchange向佇列中傳送訊息,binding就是佇列和交換機之間的關係。
channel.queueBind(queueName, "logs", "");
binding完成後exchange 就會向佇列中傳送訊息了。
最終版本
Producer,在這裡就是產生log的program,基本上和前幾個都差不多。最主要的區別就是publish通過了exchange而不是routing_key。
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
還有一點要注意的是我們聲明瞭exchange。publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,log是被丟棄的。consumer:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
另一個consumer只要把輸出到螢幕上的那行程式碼改成輸出到檔案,其他並無不同,這裡就不貼出來了。