1. 程式人生 > 實用技巧 >RabbitMQ訊息

RabbitMQ訊息

Message (訊息) 是指伺服器和應用程式之間傳輸的資料,它由 PropertiesPayload (Body) 組成。

一、訊息屬性

1.1 訊息常用屬性

屬性名稱 屬性含義
Delivery mode 是否持久化,1:Non-persistent,2:Persistent
headers 自定義屬性
content_type 訊息內容的型別
content_encoding 訊息內容的編碼格式
priority 訊息的優先順序
correlation_id 關聯id
reply_to 用於指定回覆的佇列的名稱
expiration 訊息的失效時間
message_id 訊息id
timestamp 訊息的時間戳
type 型別
user_id 使用者id
app_id 應用程式id
cluster_id 叢集id

1.2 例項

消費者

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1.建立連線工廠物件
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設定主機
        connectionFactory.setHost("111.231.83.100");
        // 設定埠
        connectionFactory.setPort(5672);
        // 設定虛擬主機
        connectionFactory.setVirtualHost("/");

        // 2.獲取一個連線物件
        final Connection connection = connectionFactory.newConnection();

        // 3.建立 Channel
        final Channel channel = connection.createChannel();

        String exchangeType = "topic";

        // 4.宣告交換機
        String exchangeName = "messageExchange";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        // 5.申明佇列
        String queueName = "messageQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 6.將交換機和佇列進行繫結關係
        String routingKey1 = "#";
        channel.queueBind(queueName, exchangeName, routingKey1);

        // 7.迴圈消費
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        System.err.println("消費端啟動");
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消費端消費: " + msg);

            System.out.println("訊息持久化屬性:" + delivery.getProperties().getDeliveryMode());
            System.out.println("訊息編碼格式:" + delivery.getProperties().getContentEncoding());
            System.out.println("訊息過期時間:" + delivery.getProperties().getExpiration());
            System.out.println("==========自定義屬性值==========");
            Map<String, Object> headers = delivery.getProperties().getHeaders();
            for (String key : headers.keySet()) {
                System.out.println("key: " + key + " ,value: " + headers.get(key));
            }


        }
    }
}

生產者:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1.建立連線工廠物件
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設定主機
        connectionFactory.setHost("111.231.83.100");
        // 設定埠
        connectionFactory.setPort(5672);
        // 設定虛擬主機
        connectionFactory.setVirtualHost("/");

        // 2.獲取一個連線物件
        final Connection connection = connectionFactory.newConnection();

        // 3.建立 Channel
        final Channel channel = connection.createChannel();


        // 4.宣告交換機
        String exchangeName = "messageExchange";
        // 5.設定訊息屬性
        Map<String, Object> headers = new HashMap<>();
        headers.put("key1", 1);
        headers.put("key2", 2);
        headers.put("key3", 3);
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                // 設定訊息是否持久化
                .deliveryMode(2)
                // 設定訊息編碼格式
                .contentEncoding("UTF-8")
                // 設定訊息過期時間
                .expiration("10000")
                // 設定自定義屬性
                .headers(headers)
                .build();
        // 6.傳送訊息
        channel.basicPublish(exchangeName, "message", properties, "擁有有屬性的訊息".getBytes());

        // 7.關閉資源
        channel.close();
        connection.close();
        connectionFactory.clone();
    }
}

先啟動消費者,然後啟動生產者,檢視控制檯輸出:

消費端啟動
消費端消費: 擁有有屬性的訊息
訊息持久化屬性:2
訊息編碼格式:UTF-8
訊息過期時間:10000
==========自定義屬性值==========
key: key1 ,value: 1
key: key2 ,value: 2
key: key3 ,value: 3

二、訊息應用

2.1 TTL 訊息

對於訊息來說可以為它設定一個過期時間,如果超過這個時間還沒有被消費的話就自動刪除。

在程式碼中有兩種方法設定某個佇列的訊息過期時間:

  • 針對佇列來說,可以使用 x-message-ttl 引數設定當前佇列中所有訊息的過期時間,即當前佇列中所有的訊息過期時間都一樣;
  • 針對單個訊息來說,在釋出訊息時,可以使用 Expiration 引數來設定單個訊息的過期時間。

以上兩個引數的單位都是毫秒,即1000毫秒為1秒。如果以上兩個都設定,則以當前訊息最短的那個過期時間為準。

2.2 延時訊息

RabbitMQ 不支援延時訊息的使用,可以採用以下方式實現:

  • 先儲存到資料庫,用定時任務掃描,登記時刻+延時時間,就是需要投遞的時刻;

  • 利用 RabbitMQ 的死信佇列實現;

  • 利用 rabbitmq-delayed-message-exchange 外掛;