RabbitMQ使用教程(四):釋出/訂閱模式—Publish/Subscribe
一、釋出/訂閱模式說明
今天我們來學習一點新的東西,之前我們是將一個訊息傳送給了一個特定的消費者,今天的做法完全不同,不再發送給某一個消費者,而是將一個訊息傳送給多個消費者,這便是:釋出/訂閱模式。
我們將使用該模式來實現一個日誌系統:一個程式產生日誌,一個程式處理日誌。
二、認識交換機
在之前的教程中,我們用郵局作了比喻,回顧一下我們之前的理解:
一個生產者是使用者程式,傳送訊息
一個佇列是存放訊息的地方,可以理解為緩衝區
一個消費者使用者程式,接收訊息
關鍵問題在於之前忘了講:生產者的訊息並不是直接傳送給佇列的,生產者本身也不知道要把訊息發給哪個佇列,而是將訊息傳送給交換機,再由交換機發送給佇列。
生產者的訊息並不是直接傳送給佇列的,而是傳送給交換機的,交換機再發給佇列的。 |
交換機理解起來非常簡單,它一端接收生產者傳送過來的訊息,一端將訊息傳送給佇列。所以,當交換機接收到一個訊息時,它必須作出相應的處理,是發給一個特定的佇列?還是發給多個佇列?還是丟棄…
處理的方式取決於交換機的型別!
這裡有四種可用的交換機型別:direct、topic、headers、fanout
我們使用最後一種fanout,建立該型別的交換機:
channel.exchangeDeclare("logs", "fanout");
fanout交換機,你很容易猜到它的作用:將接收到的訊息廣播給每一個佇列。
三、無名交換機(預設)
在之前的教程中,我們根本不知道交換機的概念,但我們還是將訊息成功地傳送了出去。
回顧一下之前的程式碼:
channel.basicPublish("", "hello", null, message.getBytes());
注意第一個引數,我們填了”“,這個”“正是預設交換機的名稱,它的作用是根據routingKey,傳送訊息。發現了routingKey:hello,便會把訊息傳送給hello佇列。到這裡我們已經明白:第一個引數是交換機名稱,第二個引數是routingKey名稱。
現在我們需要建立日誌系統所需的交換機:
channel.basicPublish ( "logs", "", null, message.getBytes());
四、臨時佇列
大家應該記得我們之前的佇列:”hello”、”task_queue”,我們的每一個佇列都有一個名稱。沒錯,佇列的名稱很重要,當我們需要特定的消費者來處理特點佇列中的訊息,消費者就是根據佇列名來找到佇列的。但是這在我們的日誌系統中不適用,消費者需要接收所有的訊息,而不是一個一部分,另外我們也只對剛剛產生的訊息進行處理,日誌一旦產生,就應該及時記錄或列印。為了做到這兩點,我們需要做兩件事:
- 無論何時連線到RabbitMQ的都是一個最新的、空的佇列。我們可以建立一個隨機命名的佇列,或者讓RabbitMQ幫我們隨機命名。
- 一旦消費者斷開連線,該佇列就馬上把刪除(不再接受新的訊息)
我們可以使用queueDeclare()方法實現上面兩點,建立一個不持久、獨佔的、自動刪除的、隨機命名的佇列。獨佔佇列是指:可以私用的佇列,指定某個特定的連線可用,連線斷開就自動刪除。隨機命名如:amq.gen-JzTY20BRgKO-HjmUJj0wLg.
String queueName=channel.queueDeclare().getQueue();
五、佇列繫結交換機
我們已經建立了一個fanout型別的交換機:”logs”,現在我們需要指定該交換機的訊息需要傳送的佇列,這個指定關係稱為繫結。
channel.queueBind(queueName, "logs", "");
六、程式碼實現
我們之前是使用的預設交換機,指定routingKey為佇列名稱的方式來發送訊息。
現在我們使用了fanout型別的”logs”交換機,(臨時)佇列與交換機繫結。這時候就不要routingKey了。
EmitLog.java
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (int i = 1; i <=10; i++) {
String message = "message" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());// arg0:不再使用""預設交換機,args1:有了交換機名,不再填對routingKey名
}
channel.close();
connection.close();
}
}
ReceiveLog1.java
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLog1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();// 產生一個不持久化、獨佔的、自動刪除的、隨機命名的佇列
channel.queueBind(queueName, EXCHANGE_NAME, "");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
首先執行接收端,再執行傳送端,ReceiveLog2.java與ReceiveLog1.java的程式碼大同小異,這裡建議使用IDEA,可以很直觀的看到結果。因為,強大的IDEA不會有console覆蓋問題。
執行結果:
參考命令:
列出所有交換機:
rabbitmqctl list_exchanges
列出所有繫結關係:
rabbitmqctl list_bindings