1. 程式人生 > >spring-boot 整合 rabbitmq

spring-boot 整合 rabbitmq

RabbitMQ

介紹

RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQ是AMQP(高階訊息佇列協議)的標準實現。

概念

  • Broker:簡單來說就是訊息佇列伺服器實體。
  • Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
  • Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
  • Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
  • Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
  • vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
  • producer:訊息生產者,就是投遞訊息的程式。
  • consumer:訊息消費者,就是接受訊息的程式。
  • channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。

使用過程

  1. 客戶端連線到訊息佇列伺服器,開啟一個channel。
  2. 客戶端宣告一個exchange,並設定相關屬性。
  3. 客戶端宣告一個queue,並設定相關屬性。
  4. 客戶端使用routing key,在exchange和queue之間建立好繫結關係。
  5. 客戶端投遞訊息到exchange。

exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。
exchange也有幾個型別,完全根據key進行投遞的叫做Direct交換機,例如,繫結時設定了routing key為”abc”,那麼客戶端提交的訊息,只有設定了key為”abc”的才會投遞到佇列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號””匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.

”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它採取廣播模式,一個訊息進來時,投遞到與該交換機繫結的所有佇列。
RabbitMQ支援訊息的持久化,也就是資料寫在磁碟上,為了資料安全考慮,我想大多數使用者都會選擇持久化。訊息佇列持久化包括3個部分:

  1. exchange持久化,在宣告時指定durable => 1
  2. queue持久化,在宣告時指定durable => 1
  3. 訊息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立繫結。

Spring-boot 整合 rabbitmq

新增maven依賴

<dependency>  
      <groupId>org.springframework.boot</groupId>  
      <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>  

簡單實現

配置

在application.properties中增加如下配置

spring.rabbitmq.addresses=127.0.0.1:5672  
spring.rabbitmq.username=guest  
spring.rabbitmq.password=guest  
spring.rabbitmq.publisher-confirms=true  
spring.rabbitmq.virtual-host=/  

rabbitmq埠說明:5672-amqp,25672-clustering,61613-stomp,1883-mqtt

訊息生產者

package com.rabbitmq.send;  

import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  

@Component  
public class Sender {  

    @Autowired  
    private RabbitTemplate rabbitTemplate;  

    public void send(String msg) {  
        this.rabbitTemplate.convertAndSend("foo", msg);  
    }  
}  

訊息監聽者

package com.rabbitmq.listener;  

import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.messaging.handler.annotation.Payload;  

@Configuration  
@RabbitListener(queues = "foo")  
public class Listener {  

    private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);  

    @Bean  
    public Queue fooQueue() {  
        return new Queue("foo");  
    }  

    @RabbitHandler  
    public void process(@Payload String foo) {  
        LOGGER.info("Listener: " + foo);  
    }  
}  

測試Controller

package com.rabbitmq.controller;  

import com.rabbitmq.send.Sender;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.RestController;  

import javax.servlet.http.HttpServletRequest;  

@RestController  
public class RabbitmqController {  

    @Autowired  
    private Sender sender;  

    @GetMapping("/send")  
    public String send(HttpServletRequest request, String msg) {  
        sender.send(msg);  
        return "Send OK.";  
    }  

測試

INFO 5559 --- [cTaskExecutor-1] c.rabbitmq.listener.Listener  : Listener: this is a test  
[SimpleAsyncTaskExecutor-1] INFO  c.rabbitmq.listener.Listener - Listener: this is a test 

帶ConfirmCallback的使用

增加回調處理,這裡不再使用application.properties預設配置的方式,會在程式中顯示的使用檔案中的配置資訊。

配置

package com.rabbitmq.config;  

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.beans.factory.config.ConfigurableBeanFactory;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.context.annotation.Scope;  

@Configuration  
public class AmqpConfig {  

    public static final String FOO_EXCHANGE   = "callback.exchange.foo";  
    public static final String FOO_ROUTINGKEY = "callback.routingkey.foo";  
    public static final String FOO_QUEUE      = "callback.queue.foo";  

    @Value("${spring.rabbitmq.addresses}")  
    private String addresses;  
    @Value("${spring.rabbitmq.username}")  
    private String username;  
    @Value("${spring.rabbitmq.password}")  
    private String password;  
    @Value("${spring.rabbitmq.virtual-host}")  
    private String virtualHost;  
    @Value("${spring.rabbitmq.publisher-confirms}")  
    private boolean publisherConfirms;  

    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
        connectionFactory.setAddresses(addresses);  
        connectionFactory.setUsername(username);  
        connectionFactory.setPassword(password);  
        connectionFactory.setVirtualHost(virtualHost);  
        /** 如果要進行訊息回撥,則這裡必須要設定為true */  
        connectionFactory.setPublisherConfirms(publisherConfirms);  
        return connectionFactory;  
    }  

    @Bean  
    /** 因為要設定回撥類,所以應是prototype型別,如果是singleton型別,則回撥類為最後一次設定 */  
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        return template;  
    }  

}  

訊息生產者

package com.rabbitmq.send;  

import com.rabbitmq.config.AmqpConfig;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.support.CorrelationData;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  

import java.util.UUID;  

@Component  
public class Sender implements RabbitTemplate.ConfirmCallback {  

    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);  

    private RabbitTemplate rabbitTemplate;  

    @Autowired  
    public Sender(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
        this.rabbitTemplate.setConfirmCallback(this);  
    }  

    public void send(String msg) {  
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());  
        LOGGER.info("send: " + correlationData.getId());  
        this.rabbitTemplate.convertAndSend(AmqpConfig.FOO_EXCHANGE, AmqpConfig.FOO_ROUTINGKEY, msg, correlationData);  
    }  

    /** 回撥方法 */  
    @Override  
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
        LOGGER.info("confirm: " + correlationData.getId());  
    }  
}  

訊息監聽者

package com.rabbitmq.listener;  

import com.rabbitmq.config.AmqpConfig;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.amqp.core.Binding;  
import org.springframework.amqp.core.BindingBuilder;  
import org.springframework.amqp.core.DirectExchange;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.messaging.handler.annotation.Payload;  

@Configuration  
@RabbitListener(queues = AmqpConfig.FOO_QUEUE)  
public class Listener {  

    private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);  

    /** 設定交換機型別  */  
    @Bean  
    public DirectExchange defaultExchange() {  
        /** 
         * DirectExchange:按照routingkey分發到指定佇列 
         * TopicExchange:多關鍵字匹配 
         * FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念 
         * HeadersExchange :通過新增屬性key-value匹配 
         */  
        return new DirectExchange(AmqpConfig.FOO_EXCHANGE);  
    }  

    @Bean  
    public Queue fooQueue() {  
        return new Queue(AmqpConfig.FOO_QUEUE);  
    }  

    @Bean  
    public Binding binding() {  
        /** 將佇列繫結到交換機 */  
        return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(AmqpConfig.FOO_ROUTINGKEY);  
    }  

    @RabbitHandler  
    public void process(@Payload String foo) {  
        LOGGER.info("Listener: " + foo);  
    }  
}  

或者使用下面的程式碼來代替@RabbitHandler註解的process方法

@Bean  
public SimpleMessageListenerContainer messageContainer() {  
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
    container.setQueues(fooQueue());  
    container.setExposeListenerChannel(true);  
    container.setMaxConcurrentConsumers(1);  
    container.setConcurrentConsumers(1);  
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認  
    container.setMessageListener(new ChannelAwareMessageListener() {  

        @Override  
        public void onMessage(Message message, Channel channel) throws Exception {  
            byte[] body = message.getBody();  
            LOGGER.info("Listener onMessage : " + new String(body));  
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認訊息成功消費  
        }  
    });  
    return container;  
}  

測試

執行服務,重新整理上面的測試連結,可以在控制檯看到如下輸出

INFO 15122 --- [nio-8080-exec-1] com.jikefriend.rabbitmq.send.Sender      : send: c678afb7-af8b-42b2-9370-ea7f9d6004a0  
[http-nio-8080-exec-1] INFO  com.jikefriend.rabbitmq.send.Sender - send: c678afb7-af8b-42b2-9370-ea7f9d6004a0  
INFO 15122 --- [ 127.0.0.1:5672] com.jikefriend.rabbitmq.send.Sender      : confirm: c678afb7-af8b-42b2-9370-ea7f9d6004a0  
[AMQP Connection 127.0.0.1:5672] INFO  com.jikefriend.rabbitmq.send.Sender - confirm: c678afb7-af8b-42b2-9370-ea7f9d6004a0  
INFO 15122 --- [cTaskExecutor-1] c.jikefriend.rabbitmq.listener.Listener  : Listener: this is a test  
[SimpleAsyncTaskExecutor-1] INFO  c.j.rabbitmq.listener.Listener - Listener: this is a test  

技術交流學習或者有任何問題歡迎加群

程式設計技術交流群 : 154514123 愛上程式設計

Java技術交流群 : 6128790  Java