1. 程式人生 > 其它 >RabbitMQ 之 Work queues

RabbitMQ 之 Work queues

一、概述

Work queues 該模式下有一個生產者(Producer)、兩個消費者(Consumer01、Consumer02),多個消費者一起處理生產者傳送過來的訊息,預設情況下是採用輪詢分發

二、編碼

2.1、Producer

public class Producer {
    private static final String QUEUE_NAME = "helloWorld";
    private static final String HOST_ADDRESS = "192.168.59.130";
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin123";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ADDRESS);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);

        String message = "有意思的訊息--->";
        for (int i = 1; i < 11; i++) {
            channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("Producer send message successfully");
    }
}

2.2、Consumer01

public class Consumer01{
    private static final String QUEUE_NAME = "helloWorld";
    private static final String HOST_ADDRESS = "192.168.59.130";
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin123";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ADDRESS);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,true,null);

        DeliverCallback deliverCallback = (consumerTag, delivery ) ->{
            String message = new String(delivery.getBody());
            System.out.println(message);
        };

        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag);
        };

        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

        System.out.println("Consumer01 消費訊息完成...");
    }
}

2.3、Consumer02

public class Consumer02 {
    private static final String QUEUE_NAME = "helloWorld";
    private static final String HOST_ADDRESS = "192.168.59.130";
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin123";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ADDRESS);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,true,null);

        DeliverCallback deliverCallback = (consumerTag, delivery ) ->{
            String message = new String(delivery.getBody());
            System.out.println(message);
        };

        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag);
        };

        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

        System.out.println("Consumer02 消費訊息完成...");
    }
}

三、測試

先啟動 Consumer01、Consumer02,然後再啟動 Producer 傳送訊息,可以看出兩個消費者是以輪詢的方式消費訊息的