RabbitMQ之WorkQueue模式
阿新 • • 發佈:2021-10-04
RabbitMQ之WorkQueue模式
概念
簡單例子
其實和簡單入手的例子幾乎差不多 我們先建立好work_queues
佇列
編寫兩個消費者類等待消費
public class WorkConsumer { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.198.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立連線 connection = connectionFactory.newConnection("消費者"); //獲取通道 channel = connection.createChannel(); //通過通道宣告佇列,建立交換機等一系列事情 channel.basicConsume("work_queues", true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println("收到訊息為" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("收取訊息失敗"); } }); //卡一下 System.out.println("鍵盤輸入關閉消費者"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { //關閉通道 if(channel != null && channel.isOpen()){ try { channel.close(); } catch (Exception e){ e.printStackTrace(); } } if(connection != null && connection.isOpen()){ try { connection.close(); } catch (Exception e){ e.printStackTrace(); } } } } }
再寫一個程式碼完全一致的類執行 即出現兩個消費者
編寫生產者類 生產一定數量的訊息
public class WorkProducer { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.198.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立連線 connection = connectionFactory.newConnection("生產者"); //獲取通道 channel = connection.createChannel(); //通過通道宣告佇列,建立交換機等一系列事情 String queueName = "work_queues"; //引數:佇列名字 是否持久化 是否獨自 是否自動刪除 引數 channel.queueDeclare(queueName,false,false,false,null); for (int i = 0; i < 10; i++) { String message = i+"Hello rabbitmq"; channel.basicPublish("",queueName,null,message.getBytes()); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { //關閉通道 if(channel != null && channel.isOpen()){ try { channel.close(); } catch (Exception e){ e.printStackTrace(); } } if(connection != null && connection.isOpen()){ try { connection.close(); } catch (Exception e){ e.printStackTrace(); } } } } }
先執行兩個消費者 然後執行生產者
可以看到生產的訊息被兩個消費者大致平均的取走了