RabbitMQ 工作模式二
之前寫了WORKQUEUES 跟 Publish/Subscribe 倆種模式 ,RabbitMQ 工作模式一
Routing 工作模式
特點
每個消費者監聽自己的佇列,並且設定routingkey
生產者將訊息發給交換機,由交換機根據routingkey來轉發訊息到指定的佇列
說白了,就是在publish/subscribe 工作模式的基礎上加一層篩選,判斷
if(佇列.routingkey == 生產者.routingkey) 傳送訊息給duilie
下面是我寫的生產者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer03 { //佇列名稱 private static final String QUEUE_INFORM_TEST1 = "queue_inform_test1"; private static final String QUEUE_INFORM_TEST2 = "queue_inform_test2"; public final static String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); //設定ip factory.setHost("localhost"); //設定埠 factory.setPort(5672); //設定賬號密碼 factory.setUsername("guest"); //預設賬號密碼都是guest factory.setPassword("guest"); //設定虛擬空間 factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器 Connection connection = factory.newConnection(); Channel channel = connection.createChannel();//建立一個通道 /** * 定義交換機 * param1 : 交換機名稱 * param2 : 交換機型別 * */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** * 定義訊息佇列 * * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_INFORM_TEST1,true ,false ,false ,null); channel.queueDeclare(QUEUE_INFORM_TEST2,true ,false ,false ,null); //繫結交換機跟佇列 /* String queue, String exchange, String routingKey */ channel.queueBind(QUEUE_INFORM_TEST1,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST1); channel.queueBind(QUEUE_INFORM_TEST2,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST2); String message = ""; //給佇列傳送訊息 for (int i = 0; i < 9; i++) { // // String exchange, String routingKey, BasicProperties props, byte[] body message = "人間有百媚千紅,唯你是我情之所鍾。第"+i+"條訊息"; //給test1 傳送9條訊息 channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_TEST1 ,null ,message.getBytes() ); } for (int i = 0; i < 5; i++) { // // String exchange, String routingKey, BasicProperties props, byte[] body message = "你別回頭看我了,走吧,山高水長,可別再碰到我這麼喜歡你的人了。第"+i+"條訊息"; //給test2 傳送5條訊息 channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_TEST2 ,null ,message.getBytes() ); } channel.close(); connection.close(); } }
設定了交換機名稱,及路由的交換機型別
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
--------------------------------------------------------------------------------------------------------------------------------------
聲明瞭倆個佇列...分別為 test1, test2
** * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * * param1: 佇列名稱 * param2: 是否持久化 * param3 : 是否獨佔此佇列 * param4 : 佇列不用是否自動刪除 * param5 : 引數 */
channel.queueDeclare(QUEUE_INFORM_Test1,true ,false ,false ,null );
channel.queueDeclare(QUEUE_INFORM_Test2,true ,false ,false ,null );
------------------------------------------------------------------------------------------------------------
佇列和交換機繫結
//交換機和佇列繫結 /** * String queue, String exchange, String routingKey * param1 : 佇列名稱 * exchange : 交換機 * routingKey : 給佇列新增一個 路由key,交換機發送訊息時根據填寫的路由key 來判斷,如果填寫的key 跟 佇列的路由key 相同,那麼就會發送訊息給此佇列 */
channel.queueBind(QUEUE_INFORM_TEST1,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST1);
channel.queueBind(QUEUE_INFORM_TEST2,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST2);
-------------------------------------------------------------------------
傳送訊息
/** * String exchange, String routingKey, BasicProperties props, byte[] body * * param1 交換機名稱 * param2 根據key名稱將訊息轉發到具體的佇列,這裡填寫佇列名稱表示訊息將發到此佇列 * param3 引數 * param4 傳遞的字串 * * */
channel.basicPublish(EXCHANGE_FANOUT_INFORM,"" , null, message.getBytes());
``````````````````````````````````````````````````````````````````````````````````````````````````````````````
以上是生產者的程式碼 ,下面提供消費者的程式碼
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest1 {
//佇列名稱
private static final String QUEUE_INFORM_TEST1 = "queue_inform_test1";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定埠
factory.setPort(5672);
//設定賬號密碼
factory.setUsername("guest"); //預設賬號密碼都是guest
factory.setPassword("guest");
//設定虛擬空間
factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器
//建立與RabbitMQ服務的TCP連線
connection = factory.newConnection();
//建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
channel = connection.createChannel();
// 建立預設消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重寫監聽方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 監聽佇列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 佇列名稱
* param2 : 是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
* param3 : 消費物件,,包含消費方法
*
*/
channel.basicConsume(QUEUE_INFORM_TEST1,true,consumer);
}catch (Exception e){
}
}
}
消費者二
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest2 {
private static final String QUEUE_INFORM_TEST2 = "queue_inform_test2";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定埠
factory.setPort(5672);
//設定賬號密碼
factory.setUsername("guest"); //預設賬號密碼都是guest
factory.setPassword("guest");
//設定虛擬空間
factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器
//建立與RabbitMQ服務的TCP連線
connection = factory.newConnection();
//建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
channel = connection.createChannel();
// 建立預設消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重寫監聽方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 監聽佇列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 佇列名稱
* param2 : 是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
* param3 : 消費物件,,包含消費方法
*
*/
channel.basicConsume(QUEUE_INFORM_TEST2,true , consumer);
}catch (Exception e){
}
}
}
消費者的程式碼其實很簡單,就是監聽而已.只需要指定一下監聽的佇列就行並提供一個 執行的方法 ,就是下面這句
channel.basicConsume(QUEUE_INFORM_TEST2,true , consumer);
測試
先啟動producer,否則會報錯,因為consumer 中沒有宣告佇列,並且沒有中rabbitmq中發現佇列,就會丟擲異常
生產者 會給 test1 傳送 9條訊息 給test 傳送 5條訊息
然後我們依次啟動倆個消費者
然後看處理結果
Topics 工作模式
萬用字元工作模式
每個消費者監聽自己的佇列,並且設定帶統配符的routingkey
生產者將訊息發給broker,由交換機根據routingkey來轉發訊息到指定的佇列。
統配符規則:
符號#可以匹配多個詞,
符號*可以匹配一個詞語。
因為萬用字元感覺比較難講,所以我在網上找了一個充值的案例
場景
使用者充值完成, email 的使用者 接收email的提示, sms的使用者接收sms的提示.. 設定兩種通知型別都接收的則兩種通知都有效。
大致思路
設定 路由匹配規則
郵件的匹配規則 "inform.#.email.#" sms的匹配規則 "inform.#.sms.#"
給只接收email的使用者 傳送訊息 路由設定 inform.email
給只接收 sm的使用者 傳送訊息 路由設定 inform.sms
給 都接收的使用者 傳送訊息 inform.sms.email 不懂的話 看一下匹配規則,,有點取巧來著
下面是我寫的生產者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer03 {
//佇列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public final static String EXCHANGE_ROUTING_INFORM = "exchange_topic_inform";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定埠
factory.setPort(5672);
//設定賬號密碼
factory.setUsername("guest"); //預設賬號密碼都是guest
factory.setPassword("guest");
//設定虛擬空間
factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();//建立一個通道
/**
* 定義交換機
* param1 : 交換機名稱
* param2 : 交換機型別
*
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);
/**
* 定義訊息佇列
*
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
*
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true ,false ,false ,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true ,false ,false ,null);
//繫結交換機跟佇列
/*
String queue, String exchange, String routingKey
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"inform.#.email.#");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"inform.#.sms.#");
String message = "";
//給接收郵件的人發郵件
for (int i = 0; i < 9; i++) {
//
// String exchange, String routingKey, BasicProperties props, byte[] body
message = "記憶最讓人崩潰的地方,也許就在於它的猝不及防在某個祥和的午後,你正吃著火鍋唱著歌,那些尖利的記憶碎片就像潮水突然湧進你到腦海裡,讓你閃躲不及。";
//給test1 傳送9條訊息
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.email" ,null ,message.getBytes() );
}
//給接收簡訊的人發簡訊
for (int i = 0; i < 5; i++) {
//
// String exchange, String routingKey, BasicProperties props, byte[] body
message = "傳送了一百條簡訊,九十九條都是你";
//給test2 傳送5條訊息
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms" ,null ,message.getBytes() );
}
// 給都接收的人傳送
for (int i = 0; i < 3; i++) {
message = "我回你是秒回,你回我是輪迴";
//給test2 傳送5條訊息
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms.email" ,null ,message.getBytes() );
}
channel.close();
connection.close();
}
}
-----------------------
定義交換機,並設定交換機的型別為萬用字元模式
/** * 定義交換機 * param1 : 交換機名稱 * param2 : 交換機型別 * */
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);
---------------------------------------------------
繫結交換機跟佇列,並配置佇列的路由萬用字元規則
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"inform.#.email.#");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"inform.#.sms.#");
--------------------------------------------------------------------------------------------
傳送訊息給broker 併發送路由key
/*** 引數明細
* 1、交換機名稱,不指令使用預設交換機名稱 Default Exchang
* 2、routingKey(路由key),根據key名稱將訊息轉發到具體的佇列,這裡填寫佇列名稱表示訊息將發到此佇列
* 3、訊息屬性
* 4、訊息內容*/
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.email" ,null ,message.getBytes() );
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms" ,null ,message.getBytes() );
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms.email" ,null ,message.getBytes());
訊息監聽者1
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest1 {
//佇列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定埠
factory.setPort(5672);
//設定賬號密碼
factory.setUsername("guest"); //預設賬號密碼都是guest
factory.setPassword("guest");
//設定虛擬空間
factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器
//建立與RabbitMQ服務的TCP連線
connection = factory.newConnection();
//建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
channel = connection.createChannel();
// 建立預設消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重寫監聽方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 監聽佇列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 佇列名稱
* param2 : 是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
* param3 : 消費物件,,包含消費方法
*
*/
channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);
}catch (Exception e){
}
}
}
訊息監聽二
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest2 {
private static final String QUEUE_INFORM_SMS= "queue_inform_sms";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定埠
factory.setPort(5672);
//設定賬號密碼
factory.setUsername("guest"); //預設賬號密碼都是guest
factory.setPassword("guest");
//設定虛擬空間
factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器
//建立與RabbitMQ服務的TCP連線
connection = factory.newConnection();
//建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
channel = connection.createChannel();
// 建立預設消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重寫監聽方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 監聽佇列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 佇列名稱
* param2 : 是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
* param3 : 消費物件,,包含消費方法
*
*/
channel.basicConsume(QUEUE_INFORM_SMS,true , consumer);
}catch (Exception e){
}
}
}
其實監聽者程式碼都不要變來著,都是通用的
測試
先啟動生產者,後啟動監聽者.....理由是 如果先啟動監聽者,佇列沒建立,就會報錯
測試結果
sms的列印臺列印結果
email 列印臺列印結果
其實搞懂了路由模式後這個就很容易理解這個....
Header模式
header 模式其實跟路由模式很像,他們不同的是header模式取消routingkey,使用header中的 key/value(鍵值對)匹配佇列
瞭解了很多不同的模式,其實你會發現,程式碼很多都是相同的....這裡我就不貼上全部程式碼,就只給不同的程式碼了
繫結交換機跟佇列的程式碼
HashMap<String, Object> header_email = new HashMap<>();
header_email.put("inform_type", "cms");
HashMap<String, Object> header_sms = new HashMap<>();
header_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"",header_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"",header_sms);
傳送訊息的程式碼
message = "記憶最讓人崩潰的地方,也許就在於它的猝不及防在某個祥和的午後,你正吃著火鍋唱著歌,那些尖利的記憶碎片就像潮水突然湧進你到腦海裡,讓你閃躲不及。";
HashMap<String, Object> header = new HashMap<>();
header.put("inform_type", "sms"); //匹配sms通知消費者繫結的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(header)
//給test1 傳送9條訊息
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"" ,properties.build() ,message.getBytes() );
消費者都是一樣的,改一下佇列名稱就行