RabbitMQ訊息
阿新 • • 發佈:2020-07-08
Message (訊息) 是指伺服器和應用程式之間傳輸的資料,它由 Properties 和 Payload (Body) 組成。
一、訊息屬性
1.1 訊息常用屬性
屬性名稱 | 屬性含義 |
---|---|
Delivery mode | 是否持久化,1:Non-persistent,2:Persistent |
headers | 自定義屬性 |
content_type | 訊息內容的型別 |
content_encoding | 訊息內容的編碼格式 |
priority | 訊息的優先順序 |
correlation_id | 關聯id |
reply_to | 用於指定回覆的佇列的名稱 |
expiration | 訊息的失效時間 |
message_id | 訊息id |
timestamp | 訊息的時間戳 |
type | 型別 |
user_id | 使用者id |
app_id | 應用程式id |
cluster_id | 叢集id |
1.2 例項
消費者
public class Consumer { public static void main(String[] args) throws Exception { // 1.建立連線工廠物件 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設定主機 connectionFactory.setHost("111.231.83.100"); // 設定埠 connectionFactory.setPort(5672); // 設定虛擬主機 connectionFactory.setVirtualHost("/"); // 2.獲取一個連線物件 final Connection connection = connectionFactory.newConnection(); // 3.建立 Channel final Channel channel = connection.createChannel(); String exchangeType = "topic"; // 4.宣告交換機 String exchangeName = "messageExchange"; channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 5.申明佇列 String queueName = "messageQueue"; channel.queueDeclare(queueName, false, false, false, null); // 6.將交換機和佇列進行繫結關係 String routingKey1 = "#"; channel.queueBind(queueName, exchangeName, routingKey1); // 7.迴圈消費 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); System.err.println("消費端啟動"); while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費端消費: " + msg); System.out.println("訊息持久化屬性:" + delivery.getProperties().getDeliveryMode()); System.out.println("訊息編碼格式:" + delivery.getProperties().getContentEncoding()); System.out.println("訊息過期時間:" + delivery.getProperties().getExpiration()); System.out.println("==========自定義屬性值=========="); Map<String, Object> headers = delivery.getProperties().getHeaders(); for (String key : headers.keySet()) { System.out.println("key: " + key + " ,value: " + headers.get(key)); } } } }
生產者:
public class Producer { public static void main(String[] args) throws Exception { // 1.建立連線工廠物件 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設定主機 connectionFactory.setHost("111.231.83.100"); // 設定埠 connectionFactory.setPort(5672); // 設定虛擬主機 connectionFactory.setVirtualHost("/"); // 2.獲取一個連線物件 final Connection connection = connectionFactory.newConnection(); // 3.建立 Channel final Channel channel = connection.createChannel(); // 4.宣告交換機 String exchangeName = "messageExchange"; // 5.設定訊息屬性 Map<String, Object> headers = new HashMap<>(); headers.put("key1", 1); headers.put("key2", 2); headers.put("key3", 3); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() // 設定訊息是否持久化 .deliveryMode(2) // 設定訊息編碼格式 .contentEncoding("UTF-8") // 設定訊息過期時間 .expiration("10000") // 設定自定義屬性 .headers(headers) .build(); // 6.傳送訊息 channel.basicPublish(exchangeName, "message", properties, "擁有有屬性的訊息".getBytes()); // 7.關閉資源 channel.close(); connection.close(); connectionFactory.clone(); } }
先啟動消費者,然後啟動生產者,檢視控制檯輸出:
消費端啟動
消費端消費: 擁有有屬性的訊息
訊息持久化屬性:2
訊息編碼格式:UTF-8
訊息過期時間:10000
==========自定義屬性值==========
key: key1 ,value: 1
key: key2 ,value: 2
key: key3 ,value: 3
二、訊息應用
2.1 TTL 訊息
對於訊息來說可以為它設定一個過期時間,如果超過這個時間還沒有被消費的話就自動刪除。
在程式碼中有兩種方法設定某個佇列的訊息過期時間:
- 針對佇列來說,可以使用 x-message-ttl 引數設定當前佇列中所有訊息的過期時間,即當前佇列中所有的訊息過期時間都一樣;
- 針對單個訊息來說,在釋出訊息時,可以使用 Expiration 引數來設定單個訊息的過期時間。
以上兩個引數的單位都是毫秒,即1000毫秒為1秒。如果以上兩個都設定,則以當前訊息最短的那個過期時間為準。
2.2 延時訊息
RabbitMQ 不支援延時訊息的使用,可以採用以下方式實現:
-
先儲存到資料庫,用定時任務掃描,登記時刻+延時時間,就是需要投遞的時刻;
-
利用 RabbitMQ 的死信佇列實現;
-
利用 rabbitmq-delayed-message-exchange 外掛;