RabbitMQ 之 Work queues
阿新 • • 發佈:2021-11-09
一、概述
Work queues 該模式下有一個生產者(Producer)、兩個消費者(Consumer01、Consumer02),多個消費者一起處理生產者傳送過來的訊息,預設情況下是採用輪詢分發
二、編碼
2.1、Producer
public class Producer { private static final String QUEUE_NAME = "helloWorld"; private static final String HOST_ADDRESS = "192.168.59.130"; private static final String USER_NAME = "admin"; private static final String PASSWORD = "admin123"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDRESS); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, true, null); String message = "有意思的訊息--->"; for (int i = 1; i < 11; i++) { channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes(StandardCharsets.UTF_8)); } System.out.println("Producer send message successfully"); } }
2.2、Consumer01
public class Consumer01{ private static final String QUEUE_NAME = "helloWorld"; private static final String HOST_ADDRESS = "192.168.59.130"; private static final String USER_NAME = "admin"; private static final String PASSWORD = "admin123"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDRESS); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,true,null); DeliverCallback deliverCallback = (consumerTag, delivery ) ->{ String message = new String(delivery.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (consumerTag) ->{ System.out.println(consumerTag); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); System.out.println("Consumer01 消費訊息完成..."); } }
2.3、Consumer02
public class Consumer02 { private static final String QUEUE_NAME = "helloWorld"; private static final String HOST_ADDRESS = "192.168.59.130"; private static final String USER_NAME = "admin"; private static final String PASSWORD = "admin123"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDRESS); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,true,null); DeliverCallback deliverCallback = (consumerTag, delivery ) ->{ String message = new String(delivery.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (consumerTag) ->{ System.out.println(consumerTag); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); System.out.println("Consumer02 消費訊息完成..."); } }
三、測試
先啟動 Consumer01、Consumer02,然後再啟動 Producer 傳送訊息,可以看出兩個消費者是以輪詢的方式消費訊息的