1. 程式人生 > >訊息佇列模型深入理解

訊息佇列模型深入理解

訊息佇列模型深入理解

RabbitMQ提供了6種訊息模型,但是第6種其實是RPC,並不是MQ,因此不予學習。那麼也就剩下5種。

但是其實3、4、5這三種都屬於訂閱模型,只不過進行路由的方式不同。

2.1 基本訊息模型

RabbitMQ是一個訊息代理:它接受和轉發訊息。 你可以把它想象成一個郵局:當你把郵件放在郵箱裡時,你可以確定郵差先生最終會把郵件傳送給你的收件人。 在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。

RabbitMQ與郵局的主要區別是它不處理紙張,而是接受,儲存和轉發資料訊息的二進位制資料塊。

P(producer/ publisher):生產者,一個傳送訊息的使用者應用程式。

C(consumer):消費者,消費和接收有類似的意思,消費者是一個主要用來等待接收訊息的使用者應用程式

佇列(紅色區域):rabbitmq內部類似於郵箱的一個概念。雖然訊息流經rabbitmq和你的應用程式,但是它們只能儲存在佇列中。佇列只受主機的記憶體和磁碟限制,實質上是一個大的訊息緩衝區。許多生產者可以傳送訊息到一個佇列,許多消費者可以嘗試從一個佇列接收資料。

總之:

生產者將訊息傳送到佇列,消費者從佇列中獲取訊息,佇列是儲存訊息的緩衝區。

我們將用Java編寫兩個程式;傳送單個訊息的生產者,以及接收訊息並將其打印出來的消費者。我們將詳細介紹Java API中的一些細節,這是一個訊息傳遞的“Hello World”。

我們將呼叫我們的訊息釋出者(傳送者)Send和我們的訊息消費者(接收者)Recv。釋出者將連線到RabbitMQ,傳送一條訊息,然後退出。

2.1.1.生產者傳送訊息

public class Send {

    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 從連線中建立通道,這是完成大部分API的地方。
        Channel channel = connection.createChannel();

        // 宣告(建立)佇列,必須宣告佇列才能夠傳送訊息,我們可以把訊息傳送到佇列中。
        // 宣告一個佇列是冪等的 - 只有當它不存在時才會被建立
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 訊息內容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //關閉通道和連線
        channel.close();
        connection.close();
    }
}

控制檯:

2.1.2.管理工具中檢視訊息

進入佇列頁面,可以看到新建了一個佇列:simple_queue

點選佇列名稱,進入詳情頁,可以檢視訊息:

在控制檯檢視訊息並不會將訊息消費,所以訊息還在。

2.1.3.消費者獲取訊息

public class Recv {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 建立通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
            }
        };
        // 監聽佇列,第二個引數:是否自動進行訊息確認。
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

控制檯:

這個時候,佇列中的訊息就沒了:

我們發現,消費者已經獲取了訊息,但是程式沒有停止,一直在監聽佇列中是否有新的訊息。一旦有新的訊息進入佇列,就會立即列印.

2.1.4.訊息確認機制(ACK)

通過剛才的案例可以看出,訊息一旦被消費者接收,佇列中的訊息就會被刪除。

那麼問題來了:RabbitMQ怎麼知道訊息被接收了呢?

如果消費者領取訊息後,還沒執行操作就掛掉了呢?或者丟擲了異常?訊息消費失敗,但是RabbitMQ無從得知,這樣訊息就丟失了!

因此,RabbitMQ有一個ACK機制。當消費者獲取訊息後,會向RabbitMQ傳送回執ACK,告知訊息已經被接收。不過這種回執ACK分兩種情況:

  • 自動ACK:訊息一旦被接收,消費者自動傳送ACK
  • 手動ACK:訊息接收後,不會發送ACK,需要手動呼叫

大家覺得哪種更好呢?

這需要看訊息的重要性:

  • 如果訊息不太重要,丟失也沒有影響,那麼自動ACK會比較方便
  • 如果訊息非常重要,不容丟失。那麼最好在消費完成後手動ACK,否則接收訊息後就自動ACK,RabbitMQ就會把訊息從佇列中刪除。如果此時消費者宕機,那麼訊息就丟失了。

我們之前的測試都是自動ACK的,如果要手動ACK,需要改動我們的程式碼:

public class Recv2 {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 建立通道
        final Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
                // 手動進行ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 監聽佇列,第二個引數false,手動進行ACK
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

注意到最後一行程式碼:

channel.basicConsume(QUEUE_NAME, false, consumer);

如果第二個引數為true,則會自動進行ACK;如果為false,則需要手動ACK。方法的宣告:

2.1.4.1.自動ACK存在的問題

修改消費者,新增異常,如下:

生產者不做任何修改,直接執行,訊息傳送成功:

執行消費者,程式丟擲異常。但是訊息依然被消費:

管理介面:

2.1.4.2.演示手動ACK

修改消費者,把自動改成手動(去掉之前製造的異常)

生產者不變,再次執行:

執行消費者

但是,檢視管理介面,發現:

停掉消費者的程式,發現:

這是因為雖然我們設定了手動ACK,但是程式碼中並沒有進行訊息確認!所以訊息並未被真正消費掉。

當我們關掉這個消費者,訊息的狀態再次稱為Ready

修改程式碼手動ACK:

執行:

訊息消費成功!

2.2.work訊息模型

工作佇列或者競爭消費者模式

在第一篇教程中,我們編寫了一個程式,從一個命名佇列中傳送並接受訊息。在這裡,我們將建立一個工作佇列,在多個工作者之間分配耗時任務。

工作佇列,又稱任務佇列。主要思想就是避免執行資源密集型任務時,必須等待它執行完成。相反我們稍後完成任務,我們將任務封裝為訊息並將其傳送到佇列。 在後臺執行的工作程序將獲取任務並最終執行作業。當你執行許多工人時,任務將在他們之間共享,但是一個訊息只能被一個消費者獲取。

這個概念在Web應用程式中特別有用,因為在短的HTTP請求視窗中無法處理複雜的任務。

接下來我們來模擬這個流程:

​ P:生產者:任務的釋出者

​ C1:消費者,領取任務並且完成任務,假設完成速度較快

​ C2:消費者2:領取任務並完成任務,假設完成速度慢

面試題:避免訊息堆積?

1) 採用workqueue,多個消費者監聽同一佇列。

2)接收到訊息以後,而是通過執行緒池,非同步消費。

2.2.1.生產者

生產者與案例1中的幾乎一樣:

public class Send {
    private final static String QUEUE_NAME = "test_work_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 迴圈釋出任務
        for (int i = 0; i < 50; i++) {
            // 訊息內容
            String message = "task .. " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 2);
        }
        // 關閉通道和連線
        channel.close();
        connection.close();
    }
}

不過這裡我們是迴圈傳送50條訊息。

2.2.2.消費者1

2.2.3.消費者2

與消費者1基本類似,就是沒有設定消費耗時時間。

這裡是模擬有些消費者快,有些比較慢。

接下來,兩個消費者一同啟動,然後傳送50條訊息:

可以發現,兩個消費者各自消費了25條訊息,而且各不相同,這就實現了任務的分發。

2.2.4.能者多勞

剛才的實現有問題嗎?

  • 消費者1比消費者2的效率要低,一次任務的耗時較長
  • 然而兩人最終消費的訊息數量是一樣的
  • 消費者2大量時間處於空閒狀態,消費者1一直忙碌

現在的狀態屬於是把任務平均分配,正確的做法應該是消費越快的人,消費的越多。

怎麼實現呢?

我們可以使用basicQos方法和prefetchCount = 1設定。 這告訴RabbitMQ一次不要向工作人員傳送多於一條訊息。 或者換句話說,不要向工作人員傳送新訊息,直到它處理並確認了前一個訊息。 相反,它會將其分派給不是仍然忙碌的下一個工作人員。

再次測試:

2.3.訂閱模型分類

在之前的模式中,我們建立了一個工作佇列。 工作佇列背後的假設是:每個任務只被傳遞給一個工作人員。 在這一部分,我們將做一些完全不同的事情 - 我們將會傳遞一個資訊給多個消費者。 這種模式被稱為“釋出/訂閱”。

訂閱模型示意圖:

解讀:

1、1個生產者,多個消費者

2、每一個消費者都有自己的一個佇列

3、生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機

4、每個佇列都要繫結到交換機

5、生產者傳送的訊息,經過交換機到達佇列,實現一個訊息被多個消費者獲取的目的

X(Exchanges):交換機一方面:接收生產者傳送的訊息。另一方面:知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄。到底如何操作,取決於Exchange的型別。

Exchange型別有以下幾種:

​ Fanout:廣播,將訊息交給所有繫結到交換機的佇列

​ Direct:定向,把訊息交給符合指定routing key 的佇列

​ Topic:萬用字元,把訊息交給符合routing pattern(路由模式) 的佇列

我們這裡先學習

​ Fanout:即廣播模式

Exchange(交換機)只負責轉發訊息,不具備儲存訊息的能力,因此如果沒有任何佇列與Exchange繫結,或者沒有符合路由規則的佇列,那麼訊息會丟失!

2.4.訂閱模型-Fanout

Fanout,也稱為廣播。

流程圖:

在廣播模式下,訊息傳送流程是這樣的:

  • 1) 可以有多個消費者
  • 2) 每個消費者有自己的queue(佇列)
  • 3) 每個佇列都要繫結到Exchange(交換機)
  • 4) 生產者傳送的訊息,只能傳送到交換機,交換機來決定要發給哪個佇列,生產者無法決定。
  • 5) 交換機把訊息傳送給繫結過的所有佇列
  • 6) 佇列的消費者都能拿到訊息。實現一條訊息被多個消費者消費

2.4.1.生產者

兩個變化:

  • 1) 宣告Exchange,不再宣告Queue
  • 2) 傳送訊息到Exchange,不再發送到Queue
public class Send {

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        
        // 宣告exchange,指定型別為fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        
        // 訊息內容
        String message = "Hello everyone";
        // 釋出訊息到Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [生產者] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

2.4.2.消費者1

public class Recv {
    private final static String QUEUE_NAME = "fanout_exchange_queue_1";

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

要注意程式碼中:佇列需要和交換機繫結

2.4.3.消費者2

public class Recv2 {
    private final static String QUEUE_NAME = "fanout_exchange_queue_2";

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
            }
        };
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.4.4.測試

我們執行兩個消費者,然後傳送1條訊息:

2.5.訂閱模型-Direct

有選擇性的接收訊息

在訂閱模式中,生產者釋出訊息,所有消費者都可以獲取所有訊息。

在路由模式中,我們將新增一個功能 - 我們將只能訂閱一部分訊息。 例如,我們只能將重要的錯誤訊息引導到日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯上列印所有日誌訊息。

但是,在某些場景下,我們希望不同的訊息被不同的佇列消費。這時就要用到Direct型別的Exchange。

在Direct模型下,佇列與交換機的繫結,不能是任意綁定了,而是要指定一個RoutingKey(路由key)

訊息的傳送方在向Exchange傳送訊息時,也必須指定訊息的routing key。

P:生產者,向Exchange傳送訊息,傳送訊息時,會指定一個routing key。

X:Exchange(交換機),接收生產者的訊息,然後把訊息遞交給 與routing key完全匹配的佇列

C1:消費者,其所在佇列指定了需要routing key 為 error 的訊息

C2:消費者,其所在佇列指定了需要routing key 為 info、error、warning 的訊息

2.5.1.生產者

此處我們模擬商品的增刪改,傳送訊息的RoutingKey分別是:insert、update、delete

public class Send {
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告exchange,指定型別為direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 訊息內容
        String message = "商品新增了, id = 1001";
        // 傳送訊息,並且指定routing key 為:insert ,代表新增商品
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
        System.out.println(" [商品服務:] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

2.5.2.消費者1

我們此處假設消費者1只接收兩種型別的訊息:更新商品和刪除商品。

public class Recv {
    private final static String QUEUE_NAME = "direct_exchange_queue_1";
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 繫結佇列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete訊息
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.5.3.消費者2

我們此處假設消費者2接收所有型別的訊息:新增商品,更新商品和刪除商品。

public class Recv2 {
    private final static String QUEUE_NAME = "direct_exchange_queue_2";
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 繫結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.5.4.測試

我們分別傳送增、刪、改的RoutingKey,發現結果:

2.6.訂閱模型-Topic

Topic型別的ExchangeDirect相比,都是可以根據RoutingKey把訊息路由到不同的佇列。只不過Topic型別Exchange可以讓佇列在繫結Routing key 的時候使用萬用字元!

Routingkey 一般都是有一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert

萬用字元規則:

#:匹配一個或多個詞

*:匹配不多不少恰好1個詞

舉例:

audit.#:能夠匹配audit.irs.corporate 或者 audit.irs

audit.*:只能匹配audit.irs

在這個例子中,我們將傳送所有描述動物的訊息。訊息將使用由三個字(兩個點)組成的routing key傳送。路由關鍵字中的第一個單詞將描述速度,第二個顏色和第三個種類:“..”。

我們建立了三個繫結:Q1綁定了繫結鍵“* .orange.”,Q2綁定了“.*.rabbit”和“lazy.#”。

Q1匹配所有的橙色動物。

Q2匹配關於兔子以及懶惰動物的訊息。

練習,生產者傳送如下訊息,會進入那個佇列:

quick.orange.rabbit à Q1 Q2

lazy.orange.elephant à Q1 Q2

quick.orange.fox à Q1

lazy.pink.rabbit à Q2

quick.brown.fox à 不匹配任意佇列,被丟棄

quick.orange.male.rabbit à

orange à

2.6.1.生產者

使用topic型別的Exchange,傳送訊息的routing key有3種: item.isnertitem.updateitem.delete

public class Send {
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告exchange,指定型別為topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 訊息內容
        String message = "新增商品 : id = 1001";
        // 傳送訊息,並且指定routing key 為:insert ,代表新增商品
        channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
        System.out.println(" [商品服務:] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

2.6.2.消費者1

我們此處假設消費者1只接收兩種型別的訊息:更新商品和刪除商品

public class Recv {
    private final static String QUEUE_NAME = "topic_exchange_queue_1";
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 繫結佇列到交換機,同時指定需要訂閱的routing key。需要 update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.6.3.消費者2

我們此處假設消費者2接收所有型別的訊息:新增商品,更新商品和刪除商品。

/**
 * 消費者2
 */
public class Recv2 {
    private final static String QUEUE_NAME = "topic_exchange_queue_2";
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 繫結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.7.持久化

如何避免訊息丟失?

1) 消費者的ACK機制。可以防止消費者丟失訊息。

2) 但是,如果在消費者消費之前,MQ就宕機了,訊息就沒了。

是可以將訊息進行持久化呢?

要將訊息持久化,前提是:佇列、Exchange都持久化

2.7.1.交換機持久化

2.7.2.佇列持久化

2.7.3.訊息持久化

解決訊息丟失?

  • ack(消費者確認)
  • 持久化(保證訊息佇列可靠)
  • 傳送訊息前,將訊息持久化到資料庫,並記錄訊息狀態(可靠訊息服務)
  • 生產者確認(publisher confirm)

冪等性(同一介面被重複執行,其結果一致)

3.Spring AMQP

2.2.依賴和配置

新增AMQP的啟動器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml中新增RabbitMQ地址:

spring:
  rabbitmq:
    host: 192.168.56.101
    username: leyou
    password: leyou
    virtual-host: /leyou

2.3.監聽者

在SpringAmqp中,對訊息的消費者進行了封裝和抽象,一個普通的JavaBean中的普通方法,只要通過簡單的註解,就可以成為一個消費者。

@Component
public class Listener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "spring.test.queue", durable = "true"),
            exchange = @Exchange(
                    value = "spring.test.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"#.#"}))
    public void listen(String msg){
        System.out.println("接收到訊息:" + msg);
    }
}
  • @Componet:類上的註解,註冊到Spring容器
  • @RabbitListener:方法上的註解,宣告這個方法是一個消費者方法,需要指定下面的屬性:
    • bindings:指定繫結關係,可以有多個。值是@QueueBinding的陣列。@QueueBinding包含下面屬性:
      • value:這個消費者關聯的佇列。值是@Queue,代表一個佇列
      • exchange:佇列所繫結的交換機,值是@Exchange型別
      • key:佇列和交換機繫結的RoutingKey

類似listen這樣的方法在一個類中可以寫多個,就代表多個消費者。

2.4.AmqpTemplate

Spring最擅長的事情就是封裝,把他人的框架進行封裝和整合。

Spring為AMQP提供了統一的訊息處理模板:AmqpTemplate,非常方便的傳送訊息,其傳送方法:

紅框圈起來的是比較常用的3個方法,分別是:

  • 指定交換機、RoutingKey和訊息體
  • 指定訊息
  • 指定RoutingKey和訊息,會向預設的交換機發送訊息

2.5.測試程式碼

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testSend() throws InterruptedException {
        String msg = "hello, Spring boot amqp";
        this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
        // 等待10秒後再結束
        Thread.sleep(10000);
    }
}

執行後檢視日誌:

相關推薦

訊息佇列模型深入理解

訊息佇列模型深入理解 RabbitMQ提供了6種訊息模型,但是第6種其實是RPC,並不是MQ,因此不予學習。那麼也就剩下5種。 但是其實3、4、5這三種都屬於訂閱模型,只不過進行路由的方式不同。 2.1 基本訊息模型 RabbitMQ是一個訊息代理:它接受和轉發訊息。 你可以把它想象成一個郵局:當你把郵件

基於JVM原理、JMM模型和CPU快取模型深入理解Java併發程式設計

許多以Java多執行緒開發為主題的技術書籍,都會把對Java虛擬機器和Java記憶體模型的講解,作為講授Java併發程式設計開發的主要內容,有的還深入到計算機系統的記憶體、CPU、快取等予以說明。實際上,在實際的Java開發工作中,僅僅瞭解併發程式設計的建立、啟動、管理和通訊等基本知識還是不夠的。一

淺談非同步訊息佇列模型

什麼是訊息佇列? 所謂訊息佇列,就是一個以佇列資料結構為基礎的一個實體,這個實體是真實存在的,比如程式中的陣列,資料庫中的表,或者redis等等,都可以。 首先我們說說為什麼要使用佇列,什麼情況下才會使用佇列? 為什麼需要訊息佇列? 當系統中出現“生產“和“消費“的速度或穩定性等

基於JVM原理JMM模型和CPU快取模型深入理解Java併發程式設計

許多以Java多執行緒開發為主題的技術書籍,都會把對Java虛擬機器和Java記憶體模型的講解,作為講授Java併發程式設計開發的主要內容,有的還深入到計算機系統的記憶體、CPU、快取等予以說明。實際上,在實際的Java開發工作中,僅僅瞭解併發程式設計的建立、啟動、管理和通訊等基本知識還是不夠的。一方面,如果

基於JVM原理JMM模型和CPU緩存模型深入理解Java並發編程

可靠的 解決 start 關鍵字 juc .com 失效 接下來 直接 許多以Java多線程開發為主題的技術書籍,都會把對Java虛擬機和Java內存模型的講解,作為講授Java並發編程開發的主要內容,有的還深入到計算機系統的內存、CPU、緩存等予以說明。實際上,在實際的J

分散式訊息佇列模型 實戰

介紹 作為一種基礎的抽象資料結構,佇列被廣泛應用在各類程式設計中。大資料時代對跨程序、跨機器的通訊提出了更高的要求,和以往相比,分散式佇列程式設計的運用幾乎已無處不在。但是,這種常見的基礎性的事物往往容易被忽視,使用者往往會忽視兩點: 使用分散式佇列的時候,沒有意識到它

Handler、Message、Loop訊息佇列模型,各部分的作用

Android系統的訊息佇列和訊息迴圈都是針對具體執行緒的,一個執行緒可以存在(當然也可以不存在)一個訊息佇列(Message Queue)和一個訊息迴圈(Looper)。Android中除了UI執行

java實現簡單的訊息佇列模型(BIO)

本例項主要模擬一個一對一的訊息佇列處理: 宣告佇列: package com.bai.testbio; import java.util.LinkedList; public class JmsBuffer { // 佇列 最大儲存量 private final static int

TCP/IP四層模型深入理解

TCP/IP四層模型的補充學習 四層模型從使用者態程式/核心態程式上的劃分 鏈路層的另一種協議PPP(和乙太網協議對比) 四層模型從使用者態程式/核心態程式上的劃分 TCP/IP協議四層模型: 應用層——–使用者態程式,我的理解是socket程式

通過JVM記憶體模型深入理解值傳遞和引用傳遞兩種方式

值傳遞和引用傳遞分析Java中資料型別分為兩大類:基本型別和引用型別(也就是物件型別)。基本型別:boolean、char、byte、short、int、long、float、double引用型別:類、介面、陣列因此,變數型別也可分為兩大類:基本型別和引用型別。在分析值傳遞

Objective-C 物件模型深入理解

0x00序 本著加深對Objective-C 物件模型的理解和記憶目的,於是有了下文的簡單實踐操作。 0x01 疑問 在以下程式碼中,你能描述清楚以下問題嗎? TestClass的例項物件tcA和tcB的記憶體結構是怎麼樣的 TestClass

CSS盒模型深入理解

ext 視覺效果 frame checkbox 分享圖片 eight 標準模式 兩個 塊元素 前言 所有文檔元素都生成一個矩形框,這稱為元素框(element box),它描述了一個元素在文檔布局中所占的空間大小。而且,每個框影響著其他元素框的位置和大小 寬高 寬度w

深入理解 CSS3 彈性盒布局模型

分辨率 top 應用 時間 控制 用戶 lock fire 應用開發 彈性盒布局模型(Flexible Box Layout)是 CSS3 規範中提出的一種新的布局方式。該布局模型的目的是提供一種更加高效的方式來對容器中的條目進行布局、對齊和分配空間。這種布局

深入理解JMM(Java內存模型) --(四)volatile

iter() 通過 我們 body writer 其它 pen barrier 如何實現 volatile的特性 當我們聲明共享變量為volatile後,對這個變量的讀/寫將會很特別。理解volatile特性的一個好方法是:把對volatile變量的單個讀/寫,看成是使

深入理解JMM(Java內存模型) --(五)鎖

畫出 類圖 protected processor 線程之間的通信 ret ocl tex amp 鎖的釋放-獲取建立的happens before 關系 鎖是Java並發編程中最重要的同步機制。鎖除了讓臨界區互斥執行外,還可以讓釋放鎖的線程向獲取同一個鎖的線程發送消息。

深入理解JMM(Java內存模型) --(二)重排序

單個 擔心 但是 thread 共享 att 無需 排序 ava [轉載自並發編程網 – ifeve.com 原文鏈接:http://ifeve.com/tag/jmm/] 數據依賴性 如果兩個操作訪問同一個變量,且這兩個操作中有一個為寫操作,此時這兩個操作之間就存

深入理解JVM內存模型

關聯 tab 理解 row 多人 嚴重 html obj 編譯期 我們知道,計算機CPU和內存的交互是最頻繁的,內存是我們的高速緩存區,用戶磁盤和CPU的交互,而CPU運轉速度越來越快,磁盤遠遠跟不上CPU的讀寫速度,才設計了內存,用戶緩沖用戶IO等待導致CPU的等待成本,

深入理解CSS盒模型

文檔 play alt 文章 text apt get parent 組成 如果你在面試的時候面試官讓你談談對盒模型的理解,你是不是不知從何談起。這種看似簡單的題其實是最不好答的。 下面本文章將會從以下幾個方面談談盒模型。 基本概念:標準模型 和IE模型 CSS如

深入理解Java內存模型(五)——鎖

運行 包含 示意圖 支持 ole img api turn before 本文轉自:http://www.infoq.com/cn/articles/java-memory-model-5 鎖的釋放-獲取建立的happens before 關系 鎖是java並發編程中最重要

深入理解Java內存模型(三)——順序一致性

內存空間 寫入 方便 語言 body 一半 同步 java語言 post 本文轉自:http://www.infoq.com/cn/articles/java-memory-model-3 數據競爭與順序一致性保證 當程序未正確同步時,就會存在數據競爭。java內存模型規範