1. 程式人生 > >RabbitMQ學習筆記-Work Queues

RabbitMQ學習筆記-Work Queues

Work Queues是為了避免在當前執行緒立即執行耗時的操作而導致執行緒阻塞。我們可以把要處理的任務封裝成訊息,傳送到訊息佇列。然後把訊息傳送到一個或多個工作執行緒。由工作執行緒負責執行耗時的操作。

用字串模擬耗時操作。消費者收到的字串中有幾個".",就睡眠幾秒鐘。

NewTask.java(訊息傳送者)

public class NewTask {
    private final static String QUEUE_NAME = "task";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new
ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); //傳送4條訊息,代表要執行的耗時任務 for
(int i = 0; i < 4; i++) { StringBuilder msgSb = new StringBuilder("Hello World!"); for (int j = 0; j < i; j++) { msgSb.append("."); } String message = msgSb.toString(); channel.basicPublish("", QUEUE_NAME, null
, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } } } } 複製程式碼

Worker.java(訊息接收者)

public class Worker {
    private final static String QUEUE_NAME = "task";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages.");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            long start = System.currentTimeMillis();
            try {
                //執行耗時的操作
                doWork(message);
            } finally {
                System.out.println(" [x] Done. Cost seconds: " + (System.currentTimeMillis() - start)/1000);
            }
        };
        boolean autoAck = true; // acknowledgment is covered below
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }


    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    //模擬耗時操作
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}
複製程式碼

執行NewTask.main()

輸出如下:

 [x] Sent 'Hello World!'
 [x] Sent 'Hello World!.'
 [x] Sent 'Hello World!..'
 [x] Sent 'Hello World!...'

Process finished with exit code 0
複製程式碼

可以看到,NewTask傳送完訊息後,立即就退出了。

執行Worker.main()

Work收到訊息後,開始執行耗時的操作。輸出如下:

 [*] Waiting for messages.
 [x] Received 'Hello World!'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!.'
 [x] Done. Cost seconds: 1
 [x] Received 'Hello World!..'
 [x] Done. Cost seconds: 2
 [x] Received 'Hello World!...'
 [x] Done. Cost seconds: 3
複製程式碼

Round-robin dispatching

如果有多個消費者接收訊息會怎樣?

停止掉上一步啟動的Worker.main()。執行Worker.main()兩次,啟動了兩個Worker

(edit configurations->左側選擇Worker->右上角選中“All running parallel”->ok)

執行NewTask.main()

可以看到兩個Worker的輸出分別如下: Worker1

 [*] Waiting for messages.
 [x] Received 'Hello World!'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!..'
 [x] Done. Cost seconds: 2
複製程式碼

Worker2

 [*] Waiting for messages.
 [x] Received 'Hello World!.'
 [x] Done. Cost seconds: 1
 [x] Received 'Hello World!...'
 [x] Done. Cost seconds: 3
複製程式碼

可以看到,RabbitMQ按順序依次把訊息轉發給了兩個消費者,第一,三個訊息轉發給了Worker1,第二,四個訊息轉發給了第Worker2。

這就是使用Work Queues的好處,它還可以允許你建立多個消費者,實現並行工作。很容易進行擴充套件。

Message acknowledgment(訊息確認)

如果Worker在執行過程中,發生了異常或者意外退出了,再啟動後,未執行完的任務還會繼續執行嗎,驗證一下。

執行NewTask.main()

訊息傳送完畢,輸出如下:

 [x] Sent 'Hello World!'
 [x] Sent 'Hello World!.'
 [x] Sent 'Hello World!..'
 [x] Sent 'Hello World!...'
複製程式碼

執行Worker.main(),在執行過程中將Worker殺掉

輸出如下:

 [*] Waiting for messages.
 [x] Received 'Hello World!'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!.'
 [x] Done. Cost seconds: 1
 [x] Received 'Hello World!..'
 [x] Done. Cost seconds: 2
 [x] Received 'Hello World!...'

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
複製程式碼

再次執行Worker.main()

輸出如下:

 [*] Waiting for messages.
複製程式碼

可見,Worker並沒有收到未處理完的訊息,未處理完的任務丟失了。

這是因為預設情況下,RabbitMQ在將訊息傳送給消費者之後就立即將訊息刪除了。如果Worker意外退出,不僅會丟失正在處理的任務,還會丟失已經收到的還沒來得及處理的任務訊息。

為了避免訊息丟失,消費者可以在收到訊息並處理完之後,向RabbitMQ傳送一個確認訊息,RabbitMQ在收到確認訊息之後才會將已傳送的訊息刪除。如果消費者意外停止,同時還有別的消費者正常工作,RabbitMQ會把未處理的訊息轉發給正常工作中的消費者。否則待消費者恢復後,RabbitMQ會把未處理的訊息重新轉發給消費者。

要達到這個效果,只需將Worker.java中autoAck設定為false。並在處理完任務後,傳送確認訊息。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            long start = System.currentTimeMillis();
            try {
                //執行耗時的操作
                doWork(message);
            } finally {
                System.out.println(" [x] Done. Cost seconds: " + (System.currentTimeMillis() - start)/1000);
                //傳送確認訊息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
//關閉自動確認
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
複製程式碼

修改完之後,在Worker處理訊息過程中,將Worker殺掉,再重新啟動Worker,會發現Worker會繼續收到未處理完的訊息。

Fair dispatch

RabbitMQ預設通過Round-robin dispatching的方式轉發訊息給多個消費者,可有時候,這種方式並不合適。比如上面的例子,如果第奇數個訊息都代表特別耗時的操作,而第偶數個訊息代表不耗時的操作,即使Worker2已處理完任務處於空閒狀態,RabbitMQ已會持續將第奇數個訊息轉發給Worker1,這就會造成Worker1中的任務積壓。

通過例子驗證一下

為了效果明顯,修改下NewTask.java中傳送的訊息的個數,改為傳送10條訊息

            //傳送10條訊息,代表要執行的耗時任務
            for (int i = 0; i < 10; i++) {
                StringBuilder msgSb = new StringBuilder("Hello World!");
                for (int j = 0; j < i; j++) {
                    msgSb.append(".");
                }
                String message = msgSb.toString();
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
複製程式碼

修改Worker.java 中doWork()方法

    private static void doWork(String task) {
        int dotCount = 0;
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                dotCount++;
            }
        }

        //如果有偶數個點,代表耗時操作
        if (dotCount % 2 == 0) {
            try {
                //模擬耗時操作
                Thread.sleep(1000*dotCount);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
複製程式碼

先啟動兩個Worker,再啟動NewTask

可以看到收到有奇數個點的訊息的Worker很快執行完了任務,而另一個Worker則一直在處理耗時的操作。 Worker1

 [*] Waiting for messages.
 [x] Received 'Hello World!.'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!...'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!.....'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!.......'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!.........'
 [x] Done. Cost seconds: 0
複製程式碼

Worker2

 [*] Waiting for messages.
 [x] Received 'Hello World!'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!..'
 [x] Done. Cost seconds: 2
 [x] Received 'Hello World!....'
 [x] Done. Cost seconds: 4
 [x] Received 'Hello World!......'
 [x] Done. Cost seconds: 6
 [x] Received 'Hello World!........'
 [x] Done. Cost seconds: 8
複製程式碼

為了避免這種情況,只需在Worker.java新增如下兩行程式碼

public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages.");

        //fair dispatch(新新增的兩行程式碼)
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            long start = System.currentTimeMillis();
            try {
                //執行耗時的操作
                doWork(message);
            } finally {
                System.out.println(" [x] Done. Cost seconds: " + (System.currentTimeMillis() - start) / 1000);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false; // acknowledgment is covered below
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }
複製程式碼

關閉Worker,再次重新啟動兩個Worker,啟動NewTask

輸出如下: Worker1

 [*] Waiting for messages.
 [x] Received 'Hello World!'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!...'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!....'
 [x] Done. Cost seconds: 4
 [x] Received 'Hello World!.......'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!........'
 [x] Done. Cost seconds: 8
複製程式碼

Worker2

 [*] Waiting for messages.
 [x] Received 'Hello World!.'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!..'
 [x] Done. Cost seconds: 2
 [x] Received 'Hello World!.....'
 [x] Done. Cost seconds: 0
 [x] Received 'Hello World!......'
 [x] Done. Cost seconds: 6
 [x] Received 'Hello World!.........'
 [x] Done. Cost seconds: 0
複製程式碼

可以看到,耗時的任務已經"均勻"的分配給了兩個Worker。channel.basicQos(1),告訴RabbitMQ,不要一次把所有訊息都給我,在我處理完訊息後,再給我一個訊息,否則把訊息轉發給別的已處理完訊息的消費者。

Message durability(訊息持久化)

問題又來了,如果訊息處理過程中RabbitMQ Server意外停止了呢?沒有轉發出去的訊息會丟失嗎?答案是會丟失的。為了避免這種情況,我們就需要將訊息佇列宣告為可持久化的。

首先,修改Worker.java和NewTask.java中Queue的名稱,改為task_durable

因為RabbitMQ不允許修改已存在的訊息佇列的屬性。

 private final static String QUEUE_NAME = "task_durable";
複製程式碼

宣告訊息佇列時,指定為可持久化的

 boolean durable = true;
 channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
複製程式碼

傳送訊息時,指定訊息為可持久化

            //傳送10條訊息,代表要執行的耗時任務
            for (int i = 0; i < 10; i++) {
                StringBuilder msgSb = new StringBuilder("Hello World!");
                for (int j = 0; j < i; j++) {
                    msgSb.append(".");
                }
                String message = msgSb.toString();
                //指定訊息為可持久化
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
複製程式碼

這樣,在訊息處理的過程中,即使RabbitMQ Server重啟,也不會丟失訊息了。