1. 程式人生 > 實用技巧 >Spring Boot整合RabbitMQ實踐

Spring Boot整合RabbitMQ實踐

一、建立生產者服務

1、建立生產者服務 rabbit-producer

spring boot版本為2.1.16.RELEASE

2、pom.xml

引入spring-boot-starter-amqp

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

  

3、配置application.properties

server.servlet.context-path=/
server.port=8002

# 叢集用逗號分割
spring.rabbitmq.addresses=xx.xx.xx.xx:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 使用啟動訊息確認模式
spring.rabbitmq.publisher-confirms=true

#設定return訊息模式,注意要與mandatory一起去配合使用
#spring.rabbitmq.publisher-returns=true
#spring.rabbitmq.template.mandatory=true


spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=non_null

  

4、傳送

@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 這裡就是確認訊息的回撥監聽介面,用於確認訊息是否被broker所收到
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

        /**
         * @param correlationData 作為一個唯一的標識
         * @param ack broker是否落盤成功
         * @param cause 失敗的異常資訊
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("訊息ACK結果:" + ack + ",correlationData: " +correlationData.getId())  ;
        }
    };


    /**
     * 對外發送訊息的方法
     * @param message 具體的訊息內容
     * @param properties 額外的附加屬性
     * @throws Exception
     */
    public void send(Object message, Map<String,Object> properties) throws Exception{
        MessageHeaders mhs = new MessageHeaders(properties);
        Message<?>  msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);

        //指定業務唯一的ID
        String uuid = UUID.randomUUID().toString();
        System.out.println("生成業務唯一Id=" + uuid);
        CorrelationData correlationData = new CorrelationData(uuid);

        MessagePostProcessor mpp = new MessagePostProcessor() {
            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                System.out.println("postProcessMessage message: " +message);
                return message;
            }
        };
        rabbitTemplate.convertAndSend("myexchange1", "myroutingkey.1", msg, mpp, correlationData);

    }
}

  

5、測試傳送訊息

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitProducerApplicationTests {

    @Autowired
    private  RabbitSender rabbitSender;

    @Test
    public void testSender() throws  Exception {
        Map<String,Object> properties = new HashMap<>();
        properties.put("name","zhangsan");
        properties.put("age","18");
        rabbitSender.send("hello rabbit", properties);

        Thread.sleep(10000);
    }



}

  

二、RabbitMQ消費者服務

1、建立RabbitMQ消費者服務 rabbit-consumer

spring boot版本為2.1.16.RELEASE

2、pom.xml

引入spring-boot-starter-amqp

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

  

3、配置application.properties

server.servlet.context-path=/
server.port=8002

# 叢集用逗號分割
spring.rabbitmq.addresses=xx.xx.xx.xx:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 表示消費者消費成功訊息以後需要手工的資訊簽收(ack),預設為auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.prefetch=1

# RabbitListener 相關配置
spring.rabbitmq.listener.order.exchange.name=myexchange1
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.key=myroutingkey.*

spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=non_null

  

4、建立接收者類

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;


/**
 * @description: 訊息接收者
 * @author:
 * @create: 2020-08-01 09:35
 */
@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myqueue1", durable = "true"),
                    exchange = @Exchange(name = "${spring.rabbitmq.listener.order.exchange.name}",
                    durable= "${spring.rabbitmq.listener.order.exchange.durable}",
                    type = "${spring.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "true"),
                    key = "${spring.rabbitmq.listener.order.exchange.key}"
                )
         )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception{
        // 步驟1、收到訊息後進行業務端消費處理
        System.out.println("消費訊息" + message.getPayload());

        //步驟2、處理成功後,獲取deliveryTag,並進行手工ACK操作,因為配置檔案配置的是手工簽收模式
        // spring.rabbitmq.listener.simple.acknowledge-mode=manual
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }
}

RabbitListener相關屬性配置在屬性檔案中。

消費者採用手工配置 channel.basicAck