1. 程式人生 > >RabbitMQ學習筆記(3)----RabbitMQ Worker的使用

RabbitMQ學習筆記(3)----RabbitMQ Worker的使用

1. Woker佇列結構圖

  

  這裡表示一個生產者生產了訊息傳送到佇列中,但是確有兩個消費者在消費同一個佇列中的訊息。

2. 建立一個生產者

  Producer如下:

package com.wangx.rabbitmq.worker;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer { /** * 佇列名字 */ private static final String QUEUE_NAME = "worker-queue"; public static void main(String[] args) throws IOException, TimeoutException { //建立連線工廠 ConnectionFactory factory = new ConnectionFactory(); //設定伺服器主機 factory.setHost("127.0.0.1");
//設定使用者名稱 factory.setUsername("wangx"); //設定密碼 factory.setPassword("wangx"); //設定VirtualHost factory.setVirtualHost("/wangx"); Connection connection = null; Channel channel = null; try { //建立連線 connection = factory.newConnection();
//建立訊息通道 channel = connection.createChannel(); //宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //傳送訊息 for (int i = 0; i < 10; i++) { //傳送訊息 channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes()); System.out.println(" [x] Sent '" + message + i + "'"); } }catch (Exception e) { e.printStackTrace(); } finally { channel.close(); connection.close(); } } }

  這裡同時向佇列傳送了十條訊息。

3. 建立兩個消費者

  Consumer1如下:

package com.wangx.rabbitmq.worker;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    /**
     * 佇列名字
     */
    private static final String QUEUE_NAME = "worker-queue";
    public static void main(String[] args) throws IOException, TimeoutException {

        //建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定伺服器主機
        factory.setHost("localhost");
        //設定使用者
        factory.setUsername("wangx");
        //設定密碼
        factory.setPassword("wangx");
        //設定VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        try {
            //建立連線
            connection = factory.newConnection();
            //建立訊息通道
            final Channel  channel = connection.createChannel();
            //宣告佇列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel){
                //重寫DefaultConsumer中handleDelivery方法,在方法中獲取訊息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    try {
                        //訊息沉睡一秒
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer1 收到訊息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer1 訊息消費完成....");
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }

                }
            };
            //監聽訊息
            channel.basicConsume(QUEUE_NAME, false,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

  Consumer2

package com.wangx.rabbitmq.worker;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    /**
     * 佇列名字
     */
    private static final String QUEUE_NAME = "worker-queue";
    public static void main(String[] args) throws IOException, TimeoutException {

        //建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定伺服器主機
        factory.setHost("localhost");
        //設定使用者
        factory.setUsername("wangx");
        //設定密碼
        factory.setPassword("wangx");
        //設定VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        try {
            //建立連線
            connection = factory.newConnection();
            //建立訊息通道
            final Channel  channel = connection.createChannel();
//            channel.basicQos(1);
            //宣告佇列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel){
                //重寫DefaultConsumer中handleDelivery方法,在方法中獲取訊息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    try {
                        //訊息沉睡100ms
                        Thread.sleep(100);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer2 收到訊息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer2 訊息消費完成....");
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }

                }
            };
            //監聽訊息
            channel.basicConsume(QUEUE_NAME, false,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

  可以看到consumer1在訊息處理的過程中,沉睡了1s,而consumer2沉睡了100ms,以前面的mq的慣性來說,應該是沉睡時間少的消費多一些訊息,但是我們來看控制檯:

Consumer1:

consumer1 收到訊息 'Hello World!0'
consumer1 訊息消費完成....
consumer1 收到訊息 'Hello World!2'
consumer1 訊息消費完成....
consumer1 收到訊息 'Hello World!4'
consumer1 訊息消費完成....
consumer1 收到訊息 'Hello World!6'
consumer1 訊息消費完成....
consumer1 收到訊息 'Hello World!8'
consumer1 訊息消費完成....

Consumer2:

consumer2 收到訊息 'Hello World!1'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!3'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!5'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!7'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!9'
consumer2 訊息消費完成....

  可以看訊息的消費是平均分發的,一個消費奇數,一個偶數訊息。但是有時候我們並不希望說訊息平均消費,而是讓消費快的多消費,慢的少消費。

4. "能者多勞"模式

  ”能者多勞“即是消費速度快的消費者消費更多的訊息,速度慢的消費少的訊息。

  使用這種模式只需要設定消費者的channel的basicQos即可。

  如下:

  channel.basicQos(1);表示訊息伺服器每次只向消費分發一條訊息。可以設定多條,只需要在任意的消費者中設定就對所有consumer生效。

控制檯列印結果:

Consumer1:
    
consumer1 收到訊息 'Hello World!1'
consumer1 訊息消費完成....

Consumer2:
 
consumer2 收到訊息 'Hello World!0'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!2'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!3'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!4'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!5'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!6'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!7'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!8'
consumer2 訊息消費完成....
consumer2 收到訊息 'Hello World!9'
consumer2 訊息消費完成....

  此時Consumer1才消費了1條,Consumer2消費 了其餘的九條,這就是”能者多勞“模式的體現。

5. 訊息的確認模式

  消費者從佇列中獲取訊息,服務端是如何知道訊息已經被消費完成了呢?

  模式1:自動確認

  只要訊息從佇列中被獲取,無論消費者取到訊息後是否成功消費訊息,都認為訊息已經成功消費。

  使用方式為:將channel.basicConsume();方法的第二個引數設定為true,如下:

channel.basicConsume(QUEUE_NAME, true,consumer);

  模式2: 手動確認模式

  消費者從佇列中獲取訊息之後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋,如果消費者一直沒有反饋,那麼該訊息將一直處於不可用狀態。

  使用方式為:將channel.basicConsume();方法的第二個引數設定為true,如下:

channel.basicConsume(QUEUE_NAME, false,consumer);

  然後在訊息的DefaultConsumer.handleDelivery中使用channel.basicAck();方法在訊息消費完成時通知服務端消費已經完成。如下:

channel.basicAck(envelope.getDeliveryTag(),false);