1. 程式人生 > 實用技巧 >SpringBoot整合RabbitMQ 手動應答 簡單demo

SpringBoot整合RabbitMQ 手動應答 簡單demo

版本說明

  • JDK 1.8
  • RabbitMQ 3.7.15 Erlang 22.0
  • SpringBoot 2.3.3.RELEASE

// TODO 2021年1月8日 整理CentOS安裝RabbitMQ流程

1. 在RabbitMQ的Web管理介面,建立test佇列

引數的含義
durability:是否持久化(重啟或宕機後訊息依然儲存)

  • durable 持久
  • transient 暫時

新建maven專案。

2. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.demo</groupId>
    <artifactId>rabbitmq-demo</artifactId>
    <version>1.0.0</version>

    <properties>
        <lombok.version>1.18.12</lombok.version>
    </properties>

    <dependencies>
        <!--web 模組-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

		<!-- AMQP -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!--  lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
            <scope>provided</scope>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

3. application.yaml

server:
  port: 20002
spring:
  rabbitmq:
  	# 這裡我改了本地的hosts,實際地址是192.168.0.121
    host: vm.com
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    # 開啟訊息確認模式
    # 訊息傳送到交換機確認機制,是否確認回撥
    # publisher-confirms: true
    # 是否返回回撥
    publisher-returns: true
    template:
      #開啟mandatory: true, basic.return方法將訊息返還給生產者
      mandatory: true
    listener:
      simple:
      	# 手動應答
        acknowledge-mode: manual
        # 最少消費者數量
        concurrency: 1 
        # 最多消費者數量
        max-concurrency: 10
        # 支援重試
        retry:
          enabled: true 

  • 5672:RabbitMQ的通訊埠

  • 15672:Web管理介面埠

4. RabbitmqDemo.java

@SpringBootApplication
@EnableRabbit
public class RabbitmqDemoApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }
}

5. RabbitConfig.java

@Configuration
@Slf4j
public class RabbitConfig {

    private RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

}

配置RabbitMQ的訊息模板。

6. 訊息生產者 produce.java

@Component
public class Producer {
    
    // @Qualifier("rabbitTemplate")
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        for (int i = 0; i < 5; i++) {
            System.out.println("生產者傳送訊息,序號為: " + i);
            rabbitTemplate.convertAndSend("test", String.valueOf(i));
        }
    }
}

初始化訊息傳送模板RabbitTemplate,@Qualifier註解用於限定具體的實現類,這裡可以不指定。

7. 訊息消費者 consumer.java

消費者1和消費者2均監聽test佇列。

不同的是,消費者1收到訊息後返回確認應答basicAck。

而消費者2收到訊息後返回拒絕應答basicRegect,訊息被消費者拒絕後重新回到test佇列中,等待下次傳送給消費者。

@Component
@Slf4j
public class Consumer {

    /**
     * 消費者1 模擬正常處理訊息的情況,訊息處理完畢傳送確認應答
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "test")
    public void process1(Message message, Channel channel) throws IOException {
        log.info("消費者1 接收訊息: " + new String(message.getBody()));
        log.info("消費者1 確認應答訊息:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    /**
     * 消費者2 模擬處理訊息出錯的情況,消費者2向rabbitmq傳送拒絕應答。
     * 處理失敗的訊息會被重新放入ready中,再次傳送給消費者,直至收到確認應答
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "test")
    public void process2(Message message, Channel channel) throws IOException {
        log.info("消費者2 接收訊息:" + new String(message.getBody()));
        log.info("消費者2 拒絕應答訊息:" + new String(message.getBody()));
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
}

8. 測試RabbitMqController.java

@RestController
@RequestMapping("")
public class RabbitMqController {

    @Autowired
    private Producer producer;

    @GetMapping("/send")
    public String send() {
        producer.send();
        return "傳送完成";
    }
}

9. 測試

使用postman或瀏覽器使用Get方法請求http://localhost:20001/send,生產者會向RabbitMQ的test佇列傳送5條訊息:

生產者傳送訊息,序號為: 0
生產者傳送訊息,序號為: 1
生產者傳送訊息,序號為: 2
生產者傳送訊息,序號為: 3
生產者傳送訊息,序號為: 4

可以看出序號為2的訊息3次被消費者2接收,消費者2也3次傳送拒絕應答,直到第4次才被消費者1接收,並返回確認應答。

END