Oracle提示文字與格式字串不匹配
技術標籤:java
RabbitMQ
1.原始API(後面不用)
1.0依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
1.1 簡單模式
1.1.1. 編寫生產者
編寫訊息生產者
package com.itheima.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設為 localhost
connectionFactory.setHost("localhost");
//連線埠;預設為 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設為 /
connectionFactory.setVirtualHost("/itcast");
//連線使用者名稱;預設為guest
connectionFactory. setUsername("heima");
//連線密碼;預設為guest
connectionFactory.setPassword("heima");
//建立連線
Connection connection = connectionFactory.newConnection();
// 建立頻道
Channel channel = connection.createChannel();
// 宣告(建立)佇列
/**
* 引數1 queue:佇列名稱
* 引數2 durable:是否定義持久化佇列
* 引數3 exclusive:是否獨佔本次連線
* 引數4 autoDelete:是否在不使用的時候自動刪除佇列
* 引數5 arguments:佇列其它引數
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 要傳送的資訊
String message = "你好;小兔子!";
/**
* 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage
* 引數2:路由key,簡單模式可以傳遞佇列名稱
* 引數3:訊息其它屬性
* 引數4:訊息內容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已傳送訊息:" + message);
// 關閉資源
channel.close();
connection.close();
}
}
在執行上述的訊息傳送之後;可以登入rabbitMQ的管理控制檯,可以發現佇列和其訊息:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-n54jdN2T-1607397096331)(img\1556006638979.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-M83nBVhu-1607397096333)(img\1556006647177.png)]
1.1.2編寫消費者
package com.itheima.rabbitmq.simple;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設為 localhost
connectionFactory.setHost("localhost");
//連線埠;預設為 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設為 /
connectionFactory.setVirtualHost("/itcast");
//連線使用者名稱;預設為guest
connectionFactory.setUsername("heima");
//連線密碼;預設為guest
connectionFactory.setPassword("heima");
//建立連線
Connection connection = connectionFactory.newConnection();
// 建立頻道
Channel channel = connection.createChannel();
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//建立消費者;並設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標籤,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("接收到的訊息為:" + new String(body, "utf-8"));
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認
* 引數3:訊息接收到後回撥
*/
channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
//不關閉資源,應該一直監聽訊息
//channel.close();
//connection.close();
}
}
1.2 Work queues工作佇列模式
1.2.1編寫生產者
和簡單模式沒有區別就是多傳送了幾條訊息
package com.itheima.rabbitmq.work;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//建立連線
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 1; i <= 30; i++) {
// 傳送資訊
String message = "你好;小兔子!work模式--" + i;
/**
* 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage
* 引數2:路由key,簡單模式可以傳遞佇列名稱
* 引數3:訊息其它屬性
* 引數4:訊息內容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已傳送訊息:" + message);
}
// 關閉資源
channel.close();
connection.close();
}
}
1.2.2編寫消費者
要測試工作佇列模式可以多複製幾次下面的程式碼每一個代表一個消費者
package com.itheima.rabbitmq.work;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//一次只能接收並處理一個訊息
channel.basicQos(1);
//建立消費者;並設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標籤,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("消費者1-接收到的訊息為:" + new String(body, "utf-8"));
Thread.sleep(1000);
//確認訊息
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認
* 引數3:訊息接收到後回撥
*/
channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
}
}
1.3 Publish/Subscribe釋出與訂閱模式
1.3.1生產者
package com.itheima.rabbitmq.ps;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 釋出與訂閱使用的交換機型別為:fanout
*/
public class Producer {
//交換機名稱
static final String FANOUT_EXCHAGE = "fanout_exchange";
//佇列名稱
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//佇列名稱
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws Exception {
//建立連線
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
/**
* 宣告交換機
* 引數1:交換機名稱
* 引數2:交換機型別,fanout、topic、direct、headers
*/
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
//佇列繫結交換機
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");
for (int i = 1; i <= 10; i++) {
// 傳送資訊
String message = "你好;小兔子!釋出訂閱模式--" + i;
/**
* 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage
* 引數2:路由key,簡單模式可以傳遞佇列名稱
* 引數3:訊息其它屬性
* 引數4:訊息內容
*/
channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
System.out.println("已傳送訊息:" + message);
}
// 關閉資源
channel.close();
connection.close();
}
}
1.3.2消費者
消費者1
package com.itheima.rabbitmq.ps;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
//佇列繫結交換機
channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");
//建立消費者;並設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標籤,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("消費者1-接收到的訊息為:" + new String(body, "utf-8"));
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認
* 引數3:訊息接收到後回撥
*/
channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
}
}
消費者2
package com.itheima.rabbitmq.ps;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
//佇列繫結交換機
channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");
//建立消費者;並設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標籤,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("消費者2-接收到的訊息為:" + new String(body, "utf-8"));
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認
* 引數3:訊息接收到後回撥
*/
channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
}
}
1.4Routing路由模式
在編碼上與 Publish/Subscribe釋出與訂閱模式
的區別是交換機的型別為:Direct,還有佇列繫結交換機的時候需要指定routing key。
1)生產者
package com.itheima.rabbitmq.routing;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 路由模式的交換機型別為:direct
*/
public class Producer {
//交換機名稱
static final String DIRECT_EXCHAGE = "direct_exchange";
//佇列名稱
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//佇列名稱
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws Exception {
//建立連線
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
/**
* 宣告交換機
* 引數1:交換機名稱
* 引數2:交換機型別,fanout、topic、direct、headers
*/
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
//佇列繫結交換機
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
// 傳送資訊
String message = "新增了商品。路由模式;routing key 為 insert " ;
/**
* 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage
* 引數2:路由key,簡單模式可以傳遞佇列名稱
* 引數3:訊息其它屬性
* 引數4:訊息內容
*/
channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
System.out.println("已傳送訊息:" + message);
// 傳送資訊
message = "修改了商品。路由模式;routing key 為 update" ;
/**
* 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage
* 引數2:路由key,簡單模式可以傳遞佇列名稱
* 引數3:訊息其它屬性
* 引數4:訊息內容
*/
channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
System.out.println("已傳送訊息:" + message);
// 關閉資源
channel.close();
connection.close();
}
}
2)消費者1
package com.itheima.rabbitmq.routing;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
//佇列繫結交換機
channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");
//建立消費者;並設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標籤,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("消費者1-接收到的訊息為:" + new String(body, "utf-8"));
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認
* 引數3:訊息接收到後回撥
*/
channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
}
}
3)消費者2
package com.itheima.rabbitmq.routing;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);
//佇列繫結交換機
channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHAGE, "update");
//建立消費者;並設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標籤,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("消費者2-接收到的訊息為:" + new String(body, "utf-8"));
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認
* 引數3:訊息接收到後回撥
*/
channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);
}
}
1.5Topics萬用字元模式
Topic
型別與Direct
相比,都是可以根據RoutingKey
把訊息路由到不同的佇列。只不過Topic
型別Exchange
可以讓佇列在繫結Routing key
的時候使用萬用字元!
Routingkey
一般都是有一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert
萬用字元規則:
#
:匹配一個或多個詞
*
:匹配不多不少恰好1個詞
舉例:
item.#
:能夠匹配item.insert.abc
或者 item.insert
item.*
:只能匹配item.insert
1)生產者
使用topic型別的Exchange,傳送訊息的routing key有3種: item.insert
、item.update
、item.delete
:
package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 萬用字元Topic的交換機型別為:topic
*/
public class Producer {
//交換機名稱
static final String TOPIC_EXCHAGE = "topic_exchange";
//佇列名稱
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//佇列名稱
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
//建立連線
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
/**
* 宣告交換機
* 引數1:交換機名稱
* 引數2:交換機型別,fanout、topic、topic、headers
*/
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 傳送資訊
String message = "新增了商品。Topic模式;routing key 為 item.insert " ;
channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
System.out.println("已傳送訊息:" + message);
// 傳送資訊
message = "修改了商品。Topic模式;routing key 為 item.update" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
System.out.println("已傳送訊息:" + message);
// 傳送資訊
message = "刪除了商品。Topic模式;routing key 為 item.delete" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
System.out.println("已傳送訊息:" + message);
// 關閉資源
channel.close();
connection.close();
}
}
2)消費者1
接收兩種型別的訊息:更新商品和刪除商品
package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);
//佇列繫結交換機
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete");
//建立消費者;並設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標籤,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("消費者1-接收到的訊息為:" + new String(body, "utf-8"));
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認
* 引數3:訊息接收到後回撥
*/
channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
}
}
3)消費者2
接收所有型別的訊息:新增商品,更新商品和刪除商品。
package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 宣告(建立)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨佔本次連線
* 引數4:是否在不使用的時候自動刪除佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
//佇列繫結交換機
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");
//建立消費者;並設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標籤,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("消費者2-接收到的訊息為:" + new String(body, "utf-8"));
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認
* 引數3:訊息接收到後回撥
*/
channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
}
}
2.Spring整合RabbitMQ(用的也不多)
2.1. 搭建生產者工程
2.1.1. 建立工程
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-xICm3o8w-1607397096334)(img\1565149342994.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-XVeymlWZ-1607397096336)(img\1565144326498.png)]
2.1.2. 新增依賴
修改pom.xml檔案內容為如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>spring-rabbitmq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
</project>
2.1.3. 配置整合
- 建立
spring-rabbitmq-producer\src\main\resources\properties\rabbitmq.properties
連線引數等配置檔案;
rabbitmq.host=192.168.12.135
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast
- 建立
spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
整合配置檔案;
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--載入配置檔案-->
<context:property-placeholder location="classpath:properties/rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定義管理交換機、佇列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義持久化佇列,不存在則自動建立;不繫結到交換機則繫結到預設交換機
預設交換機型別為direct,名字為:"",路由鍵為佇列的名稱
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~廣播;所有佇列都能收到訊息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定義廣播型別交換機;並繫結上述兩個佇列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~萬用字元;*匹配一個單詞,#匹配多個單詞 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定義rabbitTemplate物件操作可以在程式碼中方便傳送訊息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
2.1.4. 傳送訊息
建立測試檔案 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 只發佇列訊息
* 預設交換機型別為 direct
* 交換機的名稱為空,路由鍵為佇列的名稱
*/
@Test
public void queueTest(){
//路由鍵與佇列同名
rabbitTemplate.convertAndSend("spring_queue", "只發佇列spring_queue的訊息。");
}
/**
* 傳送廣播
* 交換機型別為 fanout
* 繫結到該交換機的所有佇列都能夠收到訊息
*/
@Test
public void fanoutTest(){
/**
* 引數1:交換機名稱
* 引數2:路由鍵名(廣播設定為空)
* 引數3:傳送的訊息內容
*/
rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "傳送到spring_fanout_exchange交換機的廣播訊息");
}
/**
* 萬用字元
* 交換機型別為 topic
* 匹配路由鍵的萬用字元,*表示一個單詞,#表示多個單詞
* 繫結到該交換機的匹配佇列能夠收到對應訊息
*/
@Test
public void topicTest(){
/**
* 引數1:交換機名稱
* 引數2:路由鍵名
* 引數3:傳送的訊息內容
*/
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj", "傳送到spring_topic_exchange交換機heima.bj的訊息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.1", "傳送到spring_topic_exchange交換機heima.bj.1的訊息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.2", "傳送到spring_topic_exchange交換機heima.bj.2的訊息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "itcast.cn", "傳送到spring_topic_exchange交換機itcast.cn的訊息");
}
}
2.2. 搭建消費者工程
2.2.1. 建立工程
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-hFqhXhzy-1607397096339)(img\1565149374831.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-G9NIVMGN-1607397096340)(img\1565144775216.png)]
2.2.2. 新增依賴
修改pom.xml檔案內容為如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>spring-rabbitmq-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
</dependencies>
</project>
2.2.3. 配置整合
- 建立
spring-rabbitmq-consumer\src\main\resources\properties\rabbitmq.properties
連線引數等配置檔案;
rabbitmq.host=192.168.12.135
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast
- 建立
spring-rabbitmq-consumer\src\main\resources\spring\spring-rabbitmq.xml
整合配置檔案;
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--載入配置檔案-->
<context:property-placeholder location="classpath:properties/rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/>
<bean id="fanoutListener1" class="com.itheima.rabbitmq.listener.FanoutListener1"/>
<bean id="fanoutListener2" class="com.itheima.rabbitmq.listener.FanoutListener2"/>
<bean id="topicListenerStar" class="com.itheima.rabbitmq.listener.TopicListenerStar"/>
<bean id="topicListenerWell" class="com.itheima.rabbitmq.listener.TopicListenerWell"/>
<bean id="topicListenerWell2" class="com.itheima.rabbitmq.listener.TopicListenerWell2"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
</rabbit:listener-container>
</beans>
2.2.4. 訊息監聽器
1)佇列監聽器
建立 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\SpringQueueListener.java
public class SpringQueueListener implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名稱為:%s,路由鍵為:%s,佇列名為:%s的訊息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2)廣播監聽器1
建立 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\FanoutListener1.java
public class FanoutListener1 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("廣播監聽器1:接收路由名稱為:%s,路由鍵為:%s,佇列名為:%s的訊息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3)廣播監聽器2
建立 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\FanoutListener2.java
public class FanoutListener2 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("廣播監聽器2:接收路由名稱為:%s,路由鍵為:%s,佇列名為:%s的訊息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4)星號萬用字元監聽器
建立 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerStar.java
public class TopicListenerStar implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("萬用字元*監聽器:接收路由名稱為:%s,路由鍵為:%s,佇列名為:%s的訊息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
5)井號萬用字元監聽器
建立 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerWell.java
public class TopicListenerWell implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("萬用字元#監聽器:接收路由名稱為:%s,路由鍵為:%s,佇列名為:%s的訊息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
6)井號萬用字元監聽器2
建立 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerWell2.java
public class TopicListenerWell2 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("萬用字元#監聽器2:接收路由名稱為:%s,路由鍵為:%s,佇列名為:%s的訊息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.SpringBoot整合Rabbit後的API
###整合
在搭建了springboot工程後後增加下面的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然後配置RabbitMQ的相關資訊(ip地址,埠號,virtual-host,使用者名稱和密碼)即可實現整合
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /itcast
username: heima
password: heima
###生產者
在生產者端注意是進行佇列的定義,交換機的定義,交換機和佇列的繫結,傳送訊息。其中都是再配置類中進行配置
①定義RabbitMQ配置類
@Configuration
public class RabbitMQConfig {
}
定義佇列
@Configuration
public class RabbitMQConfig {
//佇列名稱
public static final String ITEM_QUEUE = "item_queue";
//宣告佇列
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder.durable(ITEM_QUEUE).build();
}
}
定義交換機以及交換機和佇列實現繫結
package com.itheima.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//交換機名稱
public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
//佇列名稱
public static final String ITEM_QUEUE = "item_queue";
//宣告交換機
@Bean("itemTopicExchange")
public Exchange topicExchange(){
//ExchangeBuilder.fanoutExchange() fanout交換機 方法引數為交換機名稱
//ExchangeBuilder.directExchange() direct交換機 方法引數為交換機名稱
//ExchangeBuilder.headersExchange() headers交換機 方法引數為交換機名稱
//ExchangeBuilder.topicExchange() topoc交換機 方法引數為交換機名稱
//durable()代表是否持久化,根據具體的業務需求決定
return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
}
//宣告佇列
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder.durable(ITEM_QUEUE).build();
}
//繫結佇列和交換機
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange){
//with的引數為交換機和佇列繫結的routingKey
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
②傳送訊息API
簡單模式直接傳送訊息到佇列: 注入RabbitTemplate 使用期convertAndSend方法.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PruducerApplication.class)
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
/**
* 方法引數介紹
* String exchange, 交換機名稱,簡單模式就寫""
* String routingKey, routingKey 如果是簡單模式就寫佇列明
* final Object object 要傳送的訊息
*
*/
rabbitTemplate.convertAndSend("", RabbitMQConfig.ITEM_QUEUE,"簡單佇列測試訊息");
}
}
萬用字元模式傳送訊息:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PruducerApplication.class)
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
/**
* 方法引數介紹
* String exchange, 交換機名稱,簡單模式就寫""
* String routingKey, routingKey 如果是簡單模式就寫佇列明
* final Object object 要傳送的訊息
*
*/
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.abc","萬用字元模式交換機測試訊息");
}
}
###消費者
消費者工程在整合了RabitMQ後只需要定義監聽器獲取訊息即可
隨便定義一個類定義接收訊息的方法。在方法上新增@RabbitListener註解指定要接受哪個佇列的訊息。即可
@Component
public class MsgListener {
/**
* 監聽某個佇列的訊息
* @param message 接收到的訊息
*/
@RabbitListener(queues = "item_queue")
public void myListener1(Message message){
System.out.println("消費者接收到的訊息為:" + new String(message.getBody()));
}
}
效果高階特性配置
confirm確認模式 訊息傳送到交換機
①配置開啟confirm
增加如下配置
spring:
rabbitmq:
publisher-confirms: true
②設定confirm回撥
package com.itheima.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.port:5672}")
private Integer port;
/**
* spring.rabbitmq.template.mandatory
* 訊息傳送失敗,是否回撥給傳送者
*/
@Value("${spring.rabbitmq.template.mandatory:false}")
private Boolean mandatory;
/**
* 是否確認
*/
@Value("${spring.rabbitmq.publisher-confirms:false}")
private Boolean publisherConfirms;
/**
* 如果mandatorys設定成true,該值也設定 成true
*/
@Value("${spring.rabbitmq.publisher-returns:false}")
private Boolean publisherReturns;
/**
* RabbitMQConfig中定義connectionFactory中設定屬性
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(this.host);
cachingConnectionFactory.setUsername(this.username);
cachingConnectionFactory.setPassword(this.password);
cachingConnectionFactory.setVirtualHost(this.virtualHost);
cachingConnectionFactory.setPort(this.port);
// 如果訊息要設定成回撥,則以下的配置必須要設定成true
cachingConnectionFactory.setPublisherConfirms(this.publisherConfirms);
cachingConnectionFactory.setPublisherReturns(this.publisherReturns);
return cachingConnectionFactory;
}
/**
* 同時為了呼叫SpringBoot整合rabbitMQ提供的傳送的方法,我們需要注入rabbitTemplate
*/
@Bean(name = "rabbitTemplate")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
template.setMandatory(mandatory);
// 訊息確認, yml需要配置 publisher-confirms: true
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("訊息傳送到exchange成功");
} else {
System.out.println("訊息傳送到exchange失敗"+cause);
}
});
return template;
}
}
return 回退模式
訊息從交換機到佇列的時候失敗。
注意:只有當訊息傳送給Exchange後,Exchange路由到Queue失敗,並且設定了訊息的處理模式不是丟棄訊息時 才會執行到ReturnCallBack
* 回退模式: 當訊息傳送給Exchange後,Exchange路由到Queue失敗是 才會執行 ReturnCallBack
* 步驟:
* 1. 開啟回退模式:publisher-returns="true"
* 2. 設定ReturnCallBack
* 3. 設定Exchange處理訊息的模式:
* 1. 如果訊息沒有路由到Queue,則丟棄訊息(預設)
* 2. 如果訊息沒有路由到Queue,返回給訊息傳送方ReturnCallBack
①配置訊息回退模式開啟
spring:
rabbitmq:
publisher-returns: true
②配置失敗的時候不丟棄訊息
spring:
rabbitmq:
template:
mandatory: true #訊息從交換機發送到佇列隨便的時候是否回退 如果為false就是直接丟棄訊息
③設定回撥callback
@Configuration
@Slf4j
public class RabbitMQConfig {
...省略,具體程式碼同上
/**
* 同時為了呼叫SpringBoot整合rabbitMQ提供的傳送的方法,我們需要注入rabbitTemplate
*/
@Bean(name = "rabbitTemplate")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
template.setMandatory(mandatory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
// 訊息返回, yml需要配置 publisher-returns: true 並且mandatory為ture
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationId();
System.err.println("訊息:"+message+":"+replyCode+":"+replyText);
// log.debug("訊息:{} 傳送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey);
});
return template;
}
}
③測試
測試的時候可以把路由鍵寫錯讓訊息不能正確傳送到佇列即可
消費端訊息確認
①配置訊息確認模式為手動確認
acknowledge-mode 的幾種值
none:自動確認
manual:手動確認
spring:
rabbitmq:
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
②修改listener,根據情況確定是確認還是拒絕
在監聽器的處理方法中修改引數列表Message message, Channel channel 。
拒絕訊息的時候使用:channel.basicNack ,該方法的requeue引數看具體業務情況來確定。
確認的時候使用:channel.basicAck
注意:
Channel的包為com.rabbitmq.client.Channel
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MsgListener {
/**
* 監聽某個佇列的訊息
* @param message 接收到的訊息
*/
@RabbitListener(queues = "item_queue")
public void myListener1(Message message, Channel channel) throws IOException {
/**
* @param deliveryTag 訊息的唯一標識
* @param multiple 是否對這條訊息之前的訊息也做相同的處理
* @param requeue 是否讓這條訊息重新進入佇列
*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
/**
* @param deliveryTag 訊息的唯一標識
* @param multiple 是否對這條訊息之前的訊息也做相同的處理
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消費者接收到的訊息為:" + message);
}
}
配置消費端限流 - 最大未處理訊息數
預設值為250
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-6VJBmkJU-1607397096341)(img\image-20200411160238861.png)]
①配置 要結合手動確認才有意義和效果
spring:
rabbitmq:
listener:
direct:
acknowledge-mode: manual
prefetch: 1 # 允許的最大未處理訊息數
simple:
acknowledge-mode: manual
prefetch: 1 # 允許的最大未處理訊息數
TTL(time to live) 訊息過期時間
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-w2mj6azX-1607397096342)(D:\工作\產出\md\暢購api\RabbitMQ\img\image-20200411164755404.png)]
一般都給佇列設定過期時間,直接在控制檯配置就可以
死信佇列
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-n8VaEc7S-1607397096344)(img\image-20200411170118275.png)]
實現步驟:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-PnMhoBEq-1607397096347)(img\image-20200411210559275.png)]
延遲佇列
訊息進入佇列後延遲一段時間才可以被消費
在RabbitMQ中就是使用TTL+死信佇列即可