rabbitmq--釋出-訂閱模型
上兩篇中我們介紹了rabbitmq中的收發模型和工作佇列模型,它們都一個共同的特點——exchange是透明的,似乎不存在,收、發雙方都是通過queue名稱產生關聯。
本篇中我們來著重介紹下rabbitmq中的exchange ,rabbitmq中涉及exchange(非透明)的收發模型我們稱之為釋出-訂閱模型。在釋出-訂閱模型中,訊息生產者和消費者不再是直接面對queue(佇列名稱),而是直面exchange,都需要經過exchange來進行訊息的傳送和接收。
釋出-訂閱模型又可以分為以下幾種:
fanout: 廣播交換機,廣播模型,不關心queue名稱,不關心路由key
direct: 直接交換機,直接模型,根據路由key字串精確匹配, 不關心queue名稱,關心路由key
topic:主題交換機,主題模型,根據路由key字串模糊(萬用字元)匹配, 不關心queue名稱,關心路由key
headers:頭交換機,訊息頭模型(使用較少),不關心queue名稱,不關心路由key,關心訊息頭中key-value對。
本篇,我們來介紹下第一種釋出訂閱模型——fanout廣播模型。
廣播模型的特點是:所有發往同一個fanout交換機的訊息都會被所有監聽這個交換機的消費者接收到,而不關心具體的路由key。廣播交換機就如同一個廣播地址,只要有消費者監聽這個廣播地址,則一定都會收到來自這個廣播地址收到的所有訊息。如圖:
示例程式碼如下
日誌生產者:
package com.tingcream.rabbitmq.broadcast; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class LogProducer { private static String EXCHANGE_NAME="logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); //主機 埠 vhost 使用者名稱 密碼 factory.setHost("192.168.9.102"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq123"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setVirtualHost("/"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); // 宣告一個fanout 廣播交換機,名稱為logs , 宣告的fanout佇列 autoDelete為true, exclude 為true channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //amq.gen-xxxxxxxxxx AD EXCL // channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments) for (int i=0;i<10;i++){ String message="你好 World "+i; // channel.basicPublish(exchange, routingKey, props, body); //釋出訊息時指定routingKey為"" channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("LogProducer Send :" + message ); } channel.close(); connection.close(); } }
日誌生產者程式碼中聲明瞭一個名叫logs的fanout型別交換機,並使用這個交換機來發送訊息,並且傳送訊息時設定路由key為"",因為在廣播的場景中根本就不關心路由key。
日誌消費者A:
package com.tingcream.rabbitmq.broadcast; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class LogReceiverA { private static String EXCHANGE_NAME="logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); //主機 埠 vhost 使用者名稱 密碼 factory.setHost("192.168.9.102"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq123"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setVirtualHost("/"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); //重要 與生產者使用同一個交換機 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //獲取一個隨機的佇列名稱 String queueName = channel.queueDeclare().getQueue(); //關聯 exchange 和 queue ,因為是廣播無需指定routekey,routingKey設定為空字串 // channel.queueBind(queue, exchange, routingKey) channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("LogReceiverA Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println( "LogReceiverA接收到訊息:" + message ); } }; channel.basicConsume(queueName, true, consumer); } }
日誌消費者B:
package com.tingcream.rabbitmq.broadcast;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class LogReceiverB {
private static String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//主機 埠 vhost 使用者名稱 密碼
factory.setHost("192.168.9.102");
factory.setUsername("rabbitmq");
factory.setPassword("rabbitmq123");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setVirtualHost("/");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
//重要 與生產者使用同一個交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//獲取一個隨機的佇列名稱
String queueName = channel.queueDeclare().getQueue();
//關聯 exchange 和 queue ,因為是廣播無需指定routekey,routingKey設定為空字串
// channel.queueBind(queue, exchange, routingKey)
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("LogReceiverB Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println( "LogReceiverA接收到訊息:" + message );
}
};
//true 自動回覆ack
channel.basicConsume(queueName, true, consumer);
}
}
注意:
日誌消費者A、B程式碼中使用了隨機生成的queue名稱(在本地隨機產生一個名稱),然後繫結這個queue名稱到logs交換機上了。這是因為所有釋出-訂閱模型對queue名稱都不關心,只要queue名稱唯一、不重複即可。繫結queue到交換機是一種慣例的做法,實際上就是從交換機上取出一條訊息並關聯上一個指定的queue,然後我們就可以對這個queue進行監聽等等,當然本例中queue的名稱是隨機生成。
先啟動消費者A、B, 再啟動生產者,執行下,登入管理後臺。
登入管理後臺後,我們發現exchange標籤項中最後一行多出了一個名叫logs交換機,它的型別為fanout(廣播),這就是我們程式碼中自定義的一個交換機。其上的7個交換機都是rabbitmq中內建的交換機。(最上面一個我們已經很熟悉了,收-發模型和工作佇列模型就是採用的這種exchange)
queue標籤項中多出了兩條隨機生成的佇列名稱。