RabbitMQ六種工作模式有哪些?怎樣用SpringBoot整合RabbitMQ
阿新 • • 發佈:2021-01-23
目錄
- 一、RabbitMQ入門程式
- 二、Work queues 工作模式
- 三、Publish / Subscribe 釋出/訂閱模式
- 四、Routing 路由模式
- 五、Topics
- 六、Header
- 七、RPC
- 八、Spring Data Elasticsearch
一、RabbitMQ入門程式
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> </dependencies>
application.yml
server:
port: 44000
spring:
application:
name: test-rabbitmq-producer
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
port: 5672
virtual-host: /
訊息傳送者
/** * Description: rabbitmq入門程式 * * @author zygui * @date Created on 2020/5/13 15:34 */ public class Producer01 { // 宣告一個訊息佇列名稱 private static final String QUEUE_NAME = "helloworld"; public static void main(String[] args) { // 通過連線工廠建立新的連線與mq建立連線 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq connectionFactory.setVirtualHost("/"); // 預設為 / 即可 // 建立連線 Connection connection = null; // 建立通道(目的是為了複用連線) Channel channel = null; try { //建立新連線 connection = connectionFactory.newConnection(); //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成 channel = connection.createChannel(); //宣告佇列,如果佇列在mq 中沒有則要建立 //引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 引數明細 * 1、queue 佇列名稱 * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在 * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立 * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除) * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間 */ channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 傳送訊息 //引數:String exchange, String routingKey, BasicProperties props, byte[] body /** * 引數明細: * 1、exchange,交換機,如果不指定將使用mq的預設交換機(設定為"") * 2、routingKey,路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱 * 3、props,訊息的屬性 * 4、body,訊息內容 */ //訊息內容 String message = "hello world 桂朝陽"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("send to mq "+message); } catch (Exception e) { e.printStackTrace(); } finally { try { // 關閉通道 channel.close(); // 關閉連線 connection.close(); } catch (Exception e) { e.printStackTrace(); } } } }
訊息接收者
/** * Description: rabbitmq入門程式 * * @author zygui * @date Created on 2020/5/13 15:45 */ public class Consumer01 { private static final String QUEUE_NAME = "helloworld"; public static void main(String[] args) throws IOException, TimeoutException { //通過連線工廠建立新的連線和mq建立連線 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//埠 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq connectionFactory.setVirtualHost("/"); //建立新連線 Connection connection = connectionFactory.newConnection(); //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成 Channel channel = connection.createChannel(); //監聽佇列 //宣告佇列,如果佇列在mq 中沒有則要建立 //引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 引數明細 * 1、queue 佇列名稱 * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在 * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立 * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除) * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間 */ channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 實現消費方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { /** * 當接收到訊息後此方法將被呼叫 * @param consumerTag 消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume * @param envelope 信封,通過envelope * @param properties 訊息屬性 * @param body 訊息內容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收 long deliveryTag = envelope.getDeliveryTag(); //訊息內容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //監聽佇列 //引數:String queue, boolean autoAck, Consumer callback /** * 引數明細: * 1、queue 佇列名稱 * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆 * 3、callback,消費方法,當消費者接收到訊息要執行的方法 */ channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
入門程式步驟
二、Work queues 工作模式
三、Publish / Subscribe 釋出/訂閱模式
訊息生產者
public class Producer02_publish {
//佇列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
// 交換機名稱
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args) {
//通過連線工廠建立新的連線和mq建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//埠
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//建立新連線
connection = connectionFactory.newConnection();
//建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
channel = connection.createChannel();
//宣告佇列,如果佇列在mq 中沒有則要建立
//引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 引數明細
* 1、queue 佇列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
* 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
* 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
* 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//宣告一個交換機
//引數:String exchange, String type
/**
* 引數明細:
* 1、交換機的名稱
* 2、交換機的型別
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//進行交換機和佇列繫結
//引數:String queue, String exchange, String routingKey
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
//傳送訊息
//引數:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 引數明細:
* 1、exchange,交換機,如果不指定將使用mq的預設交換機(設定為"")
* 2、routingKey,路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱
* 3、props,訊息的屬性
* 4、body,訊息內容
*/
for(int i=0;i<5;i++){
//訊息內容
String message = "send inform message to user";
channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());
System.out.println("send to mq "+message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//關閉連線
//先關閉通道
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
訊息接收者1
public class Consumer02_subscribe_email {
//佇列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args) throws IOException, TimeoutException {
//通過連線工廠建立新的連線和mq建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//埠
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
//建立新連線
Connection connection = connectionFactory.newConnection();
//建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 引數明細
* 1、queue 佇列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
* 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
* 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
* 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
//宣告一個交換機
//引數:String exchange, String type
/**
* 引數明細:
* 1、交換機的名稱
* 2、交換機的型別
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//進行交換機和佇列繫結
//引數:String queue, String exchange, String routingKey
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
*/
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");
//實現消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 當接收到訊息後此方法將被呼叫
* @param consumerTag 消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 訊息屬性
* @param body 訊息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
long deliveryTag = envelope.getDeliveryTag();
//訊息內容
String message= new String(body,"utf-8");
System.out.println("receive message:"+message);
}
};
//監聽佇列
//引數:String queue, boolean autoAck, Consumer callback
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
* 3、callback,消費方法,當消費者接收到訊息要執行的方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
}
}
訊息接收者2
public class Consumer02_subscribe_sms {
//佇列名稱
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args) throws IOException, TimeoutException {
//通過連線工廠建立新的連線和mq建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//埠
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
//建立新連線
Connection connection = connectionFactory.newConnection();
//建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 引數明細
* 1、queue 佇列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
* 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
* 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
* 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//宣告一個交換機
//引數:String exchange, String type
/**
* 引數明細:
* 1、交換機的名稱
* 2、交換機的型別
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//進行交換機和佇列繫結
//引數:String queue, String exchange, String routingKey
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
*/
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");
//實現消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 當接收到訊息後此方法將被呼叫
* @param consumerTag 消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 訊息屬性
* @param body 訊息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
long deliveryTag = envelope.getDeliveryTag();
//訊息內容
String message= new String(body,"utf-8");
System.out.println("receive message:"+message);
}
};
//監聽佇列
//引數:String queue, boolean autoAck, Consumer callback
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
* 3、callback,消費方法,當消費者接收到訊息要執行的方法
*/
channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
}
}
四、Routing 路由模式
訊息生產者
public class Producer03_routing {
//佇列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
// 交換機名稱
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
// 路由鍵名稱
private static final String ROUTINGKEY_EMAIL="inform_email";
private static final String ROUTINGKEY_SMS="inform_sms";
public static void main(String[] args) {
//通過連線工廠建立新的連線和mq建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//埠
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//建立新連線
connection = connectionFactory.newConnection();
//建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
channel = connection.createChannel();
//宣告佇列,如果佇列在mq 中沒有則要建立
//引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 引數明細
* 1、queue 佇列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
* 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
* 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
* 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//宣告一個交換機
//引數:String exchange, String type
/**
* 引數明細:
* 1、交換機的名稱
* 2、交換機的型別
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//進行交換機和佇列繫結
//引數:String queue, String exchange, String routingKey
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
//channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
//channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");
//傳送訊息
//引數:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 引數明細:
* 1、exchange,交換機,如果不指定將使用mq的預設交換機(設定為"")
* 2、routingKey,路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱
* 3、props,訊息的屬性
* 4、body,訊息內容
*/
/* for(int i=0;i<5;i++){
//傳送訊息的時候指定routingKey
String message = "send email inform message to user";
channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());
System.out.println("send to mq "+message);
}*/
for(int i=0;i<5;i++){
//傳送訊息的時候指定routingKey
String message = "send sms inform message to user";
channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());
System.out.println("send to mq "+message);
}
// 此時指定的路由鍵是 inform, 所以兩個消費者都可以消費
/*for(int i=0;i<5;i++){
//傳送訊息的時候指定routingKey
String message = "send inform message to user";
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());
System.out.println("send to mq "+message);
}*/
} catch (Exception e) {
e.printStackTrace();
} finally {
//關閉連線
//先關閉通道
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
訊息接收者1
public class Consumer03_routing_email {
//佇列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
// 交換機名稱
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
// 路由鍵名稱
private static final String ROUTINGKEY_EMAIL="inform_email";
public static void main(String[] args) throws IOException, TimeoutException {
//通過連線工廠建立新的連線和mq建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//埠
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
//建立新連線
Connection connection = connectionFactory.newConnection();
//建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 引數明細
* 1、queue 佇列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
* 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
* 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
* 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
//宣告一個交換機
//引數:String exchange, String type
/**
* 引數明細:
* 1、交換機的名稱
* 2、交換機的型別
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//進行交換機和佇列繫結
//引數:String queue, String exchange, String routingKey
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
*/
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
//實現消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 當接收到訊息後此方法將被呼叫
* @param consumerTag 消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 訊息屬性
* @param body 訊息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
long deliveryTag = envelope.getDeliveryTag();
//訊息內容
String message= new String(body,"utf-8");
System.out.println("receive message:"+message);
}
};
//監聽佇列
//引數:String queue, boolean autoAck, Consumer callback
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
* 3、callback,消費方法,當消費者接收到訊息要執行的方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
}
}
http://www.dtmao.cc/news_show_631033.shtml
訊息接收者2
public class Consumer03_routing_sms {
//佇列名稱
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
private static final String ROUTINGKEY_SMS="inform_sms";
public static void main(String[] args) throws IOException, TimeoutException {
//通過連線工廠建立新的連線和mq建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//埠
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
//建立新連線
Connection connection = connectionFactory.newConnection();
//建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 引數明細
* 1、queue 佇列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
* 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
* 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
* 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//宣告一個交換機
//引數:String exchange, String type
/**
* 引數明細:
* 1、交換機的名稱
* 2、交換機的型別
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//進行交換機和佇列繫結
//引數:String queue, String exchange, String routingKey
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
*/
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
//實現消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 當接收到訊息後此方法將被呼叫
* @param consumerTag 消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 訊息屬性
* @param body 訊息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
long deliveryTag = envelope.getDeliveryTag();
//訊息內容
String message= new String(body,"utf-8");
System.out.println("receive message:"+message);
}
};
//監聽佇列
//引數:String queue, boolean autoAck, Consumer callback
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
* 3、callback,消費方法,當消費者接收到訊息要執行的方法
*/
channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
}
}
五、Topics
public class Producer04_topics {
// 佇列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
// 宣告交換機
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
// 使用萬用字元的方式來,設定路由鍵
private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
private static final String ROUTINGKEY_SMS="inform.#.sms.#";
public static void main(String[] args) {
//通過連線工廠建立新的連線和mq建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//埠
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//建立新連線
connection = connectionFactory.newConnection();
//建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
channel = connection.createChannel();
//宣告佇列,如果佇列在mq 中沒有則要建立
//引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 引數明細
* 1、queue 佇列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
* 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
* 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
* 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//宣告一個交換機
//引數:String exchange, String type
/**
* 引數明細:
* 1、交換機的名稱
* 2、交換機的型別
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//進行交換機和佇列繫結
//引數:String queue, String exchange, String routingKey
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
//傳送訊息
//引數:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 引數明細:
* 1、exchange,交換機,如果不指定將使用mq的預設交換機(設定為"")
* 2、routingKey,路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱
* 3、props,訊息的屬性
* 4、body,訊息內容
*/
for(int i=0;i<5;i++){
//傳送訊息的時候指定routingKey
String message = "send email inform message to user";
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
System.out.println("send to mq "+message);
}
for(int i=0;i<5;i++){
//傳送訊息的時候指定routingKey
String message = "send sms inform message to user";
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
System.out.println("send to mq "+message);
}
for(int i=0;i<5;i++){
//傳送訊息的時候指定routingKey
String message = "send sms and email inform message to user";
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());
System.out.println("send to mq "+message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//關閉連線
//先關閉通道
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
訊息接收者1
public class Consumer04_topics_email {
//佇列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static void main(String[] args) throws IOException, TimeoutException {
//通過連線工廠建立新的連線和mq建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//埠
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
//建立新連線
Connection connection = connectionFactory.newConnection();
//建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 引數明細
* 1、queue 佇列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
* 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
* 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
* 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
//宣告一個交換機
//引數:String exchange, String type
/**
* 引數明細:
* 1、交換機的名稱
* 2、交換機的型別
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//進行交換機和佇列繫結
//引數:String queue, String exchange, String routingKey
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
*/
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
//實現消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 當接收到訊息後此方法將被呼叫
* @param consumerTag 消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 訊息屬性
* @param body 訊息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
long deliveryTag = envelope.getDeliveryTag();
//訊息內容
String message= new String(body,"utf-8");
System.out.println("receive message:"+message);
}
};
//監聽佇列
//引數:String queue, boolean autoAck, Consumer callback
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
* 3、callback,消費方法,當消費者接收到訊息要執行的方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
}
}
小寫接收者2
public class Consumer04_topics_sms {
//佇列名稱
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_SMS="inform.#.sms.#";
public static void main(String[] args) throws IOException, TimeoutException {
//通過連線工廠建立新的連線和mq建立連線
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//埠
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
//建立新連線
Connection connection = connectionFactory.newConnection();
//建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 引數明細
* 1、queue 佇列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
* 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
* 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
* 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//宣告一個交換機
//引數:String exchange, String type
/**
* 引數明細:
* 1、交換機的名稱
* 2、交換機的型別
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//進行交換機和佇列繫結
//引數:String queue, String exchange, String routingKey
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
*/
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
//實現消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 當接收到訊息後此方法將被呼叫
* @param consumerTag 消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 訊息屬性
* @param body 訊息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
long deliveryTag = envelope.getDeliveryTag();
//訊息內容
String message= new String(body,"utf-8");
System.out.println("receive message:"+message);
}
};
//監聽佇列
//引數:String queue, boolean autoAck, Consumer callback
/**
* 引數明細:
* 1、queue 佇列名稱
* 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
* 3、callback,消費方法,當消費者接收到訊息要執行的方法
*/
channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
}
}
六、Header
七、RPC
八、Spring Data Elasticsearch
rabbitmq-producer 訊息傳送者
@Configuration
public class RabbitMQConfig {
// 宣告兩個佇列常量
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
// 宣告交換機常量
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
// 宣告兩個路由鍵常量
public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static final String ROUTINGKEY_SMS="inform.#.sms.#";
//宣告交換機
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM(){
//durable(true) 持久化,mq重啟之後交換機還在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
// 宣告佇列
//宣告QUEUE_INFORM_EMAIL佇列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL(){
return new Queue(QUEUE_INFORM_EMAIL);
}
//宣告QUEUE_INFORM_SMS佇列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
// 繫結交換機和佇列
//ROUTINGKEY_EMAIL佇列繫結交換機,指定routingKey
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS佇列繫結交換機,指定routingKey
@Bean
public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
rabbitmq-consumer 訊息接收者
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static final String ROUTINGKEY_SMS="inform.#.sms.#";
//宣告交換機
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM(){
//durable(true) 持久化,mq重啟之後交換機還在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
// 宣告佇列
//宣告QUEUE_INFORM_EMAIL佇列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL(){
return new Queue(QUEUE_INFORM_EMAIL);
}
//宣告QUEUE_INFORM_SMS佇列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
// 繫結交換機和佇列
//ROUTINGKEY_EMAIL佇列繫結交換機,指定routingKey
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS佇列繫結交換機,指定routingKey
@Bean
public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
@SpringBootApplication
@EnableRabbit
public class TestRabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(TestRabbitMQApplication.class, args);
}
}
監聽訊息佇列
@Component
public class ReceiveHandler {
@RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_EMAIL})
public void receiveMsg(String msg) {
System.out.println("接收到的訊息是 = " + msg);
}
}
在rabbitmq-provider中測試
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer05_topics_springboot {
// 使用rabbitTemplate傳送訊息
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendEmail() {
String message = "send email message to user";
/**
* arg1: 交換機名稱
* arg2: 路由鍵
* arg3: 訊息內容
*/
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message);
}
}