1. 程式人生 > 其它 >第六章 玩轉Java整合RabbitMQ 工作佇列模型實戰

第六章 玩轉Java整合RabbitMQ 工作佇列模型實戰

筆記來自《小滴課堂-新版Redis6學習筆記》
第1集 Java專案建立整合RabbitMQ

簡介:Java專案建立並整合RabbitMQ

  • 建立Maven專案
  • 專案建立需要點時間-大家靜待就行

    • 刪除部分沒用的配置
    • 修改jdk版本,如果是使用jdk8的,改為1.8即可
xxxxxxxxxx
<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <maven.compiler.source>11</maven.compiler.source>
   <maven.compiler.target>11</maven.compiler.target>
  </properties>
  <dependencies>
   <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
   </dependency>
  </dependencies>
  • 涉及重複程式碼,關閉idea重複程式碼檢測

第2集 玩轉RabbitMQ簡單佇列實戰

簡介:玩轉RabbitMQ簡單佇列實戰

xxxxxxxxxx
public class Send {
   private final static String QUEUE_NAME = "hello";
   public static void main(String[] argv) throws Exception {
     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("10.211.55.13");
     factory.setUsername("admin");
     factory.setPassword("password");
     factory.setVirtualHost("/dev");
     factory.setPort(5672);
        try (  //JDK7語法 或自動關閉 connnection和channel
         //建立連線
         Connection connection = factory.newConnection();
         //建立通道
         Channel channel = connection.createChannel()) {
       /**
       * 佇列名稱
       * 持久化配置:mq重啟後還在
       * 是否獨佔:只能有一個消費者監聽佇列;當connection關閉是否刪除佇列,一般是false,釋出訂閱是獨佔
       * 自動刪除: 當沒有消費者的時候,自動刪除掉,一般是false
       * 其他引數
       * 
       * 佇列不存在則會自動建立,如果存在則不會覆蓋,所以此時的時候需要注意屬性
       */
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       String message = "Hello World!";
       /**
       * 引數說明:
       * 交換機名稱:不寫則是預設的交換機,那路由健需要和佇列名稱一樣才可以被路由,
       * 路由健名稱
       * 配置資訊
       * 傳送的訊息資料:位元組陣列
       */
       channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
       System.out.println(" [x] Sent '" + message + "'");
     }
   }
}
  • 訊息消費者(會一直監聽佇列)
xxxxxxxxxx
public class Recv {
   private final static String QUEUE_NAME = "hello";
   public static void main(String[] argv) throws Exception {
     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("10.211.55.13");
     factory.setUsername("admin");
     factory.setPassword("password");
     factory.setVirtualHost("/xdclass1");
     factory.setPort(5672);
     //消費者一般不增加自動關閉
     Connection connection = factory.newConnection();
     Channel channel = connection.createChannel();
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     //回撥方法,下面兩種都行
     Consumer consumer = new DefaultConsumer(channel){
       @Override
       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
         // consumerTag 是固定的 可以做此會話的名字, deliveryTag 每次接收訊息+1
         System.out.println("consumerTag訊息標識="+consumerTag);
         //可以獲取交換機,路由健等
         System.out.println("envelope元資料="+envelope);
         System.out.println("properties配置資訊="+properties);
         System.out.println("body="+new String(body,"utf-8"));
       }
     };
     channel.basicConsume(QUEUE_NAME,true,consumer);
//     DeliverCallback deliverCallback = (consumerTag, envelop, delivery,properties, msg) -> {
//       String message = new String(msg, "UTF-8");
//       System.out.println(" [x] Received '" + message + "'");
//     };
     //自動確認訊息
     //channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
   }
}