1. 程式人生 > >SpringBoot與消息(RabbitMQ)

SpringBoot與消息(RabbitMQ)

多路復用 ont 參考 turn ssa change 屬性 min org

1. JMS和AMQP

  • JMS(Java Message Service):
    • ActiveMQ是JMS實現;
  • AMQP(Advanced Message Queuing Protocol)
    • 兼容JMS
    • RabbitMQ是AMQP的實現

2. RabbitMQ 簡介

  • Message:由消息頭和消息體組成,消息體是不透明的,而消息頭則由一系列的可選屬性組成;
  • Publisher:一個向交換器發布消息的客戶端應用程序;
  • Exchange:用來接收生產者發送的消息並將這些消息路由給服務器中的隊列;
    • 有四種類型:direct(默認),fanout,topic和headers;
  • Queue
    :用來保存消息直到發送給消費者,是消息的容器;
  • Binding:用於消息隊列和交換器之間的關聯;
  • Connection:網絡連接,比如一個TCP連接;
  • Channel:多路復用連接中的一條獨立的雙向數據流通道;
  • Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序;
  • Virtual Host:虛擬主機,表示一批交換器,消息隊列和相關對象;每個vhost本質上就是一個mini版的RabbitMQ服務器;
  • Broker:表示消息隊列服務器實體;

技術分享圖片

技術分享圖片

3. RabbitMQ 整合(SpringBoot)

  • 自動配置:
    • RabbitAutoConfiguration
    • 自動配置了連接工廠ConnectionFactory
      ;
    • RabbitProperties封裝了RabbitMQ的配置;
    • RabbitTemplate:給RabbitMQ發送和接收消息;
    • AmpqAdmin:RabbitMQ系統管理功能組件;
    • @EnableRabbit:開啟基於註解的RabbitMQ模式;
    • @EnableRabbit@RabbitListener用於監聽消息隊列的內容;
// application.properties 配置文件
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest


// 測試類
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests{

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void contextLoads(){
        // 點對點消息
        // Message 需要自己構造一個,定義消息體內容和消息頭
        // rabbitTemplate.send(exchange, routeKey, message);

        // rabbitTemplate.convertAndSend(exchange, routeKey, object)
        // 只需要傳入要發送的對象, 會自動序列化發送給rabbitmq, object 默認當成消息體

        Map<String, Object> map = new HashMap<>();
        map.put("msg","匆匆的我來了...");
        map.put("data",Arrays.asList("777477",232,true));

        rabbitTemplate.convertAndSend("exchange.direct", "atnoodles.news",map);
    }   

    // 接收消息
    @Test
    public void receive(){
        Object o = rabbitTemplate.receiveAndConvert("atnoodles.news");
        System.out.println(o.getClass());       // Class java.util.HashMap
        System.our.println(o);
    }
}


// 如果需要將發送的數據自動轉換為JSON,發送出去
// com.noodles.springboot.rabbitmq.config.MyAMQPConfig.java

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyAMQPConfig{
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

4. AmpqAdmin

  • 創建和刪除 Queue, Exchange, Binding
// 測試類
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests{

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange(){
        // 創建 Exchange
        amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange"));
        System.out.println("創建完成...");

        // 創建 Queue
        amqpAdmin.declareQueue(new Queue("amqpAdmin.queue", true));
    }
}

參考資料:

  • SpringBoot與消息
  • RabbitMQ 安裝(Windows)
  • Mac 安裝RabbitMQ
  • 啟動/停止rabbitMQ

SpringBoot與消息(RabbitMQ)