1. 程式人生 > >RabbiMQ基礎以及spring-boot-starter-amqp使用

RabbiMQ基礎以及spring-boot-starter-amqp使用

outb convert net ESS XML fig vhost chan 多個

? RabbitMQ是一種基於amq協議的消息隊列,本文主要記錄一下rabbitmq的基礎內容以及使用spring-boot-starter-amqp操作rabbitmq。

1,rabbitmq中的幾個重要概念

a) 虛擬主機(vhost)

? 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。虛擬主機的作用在於進行權限管控,rabbitmq默認有一個虛擬主機"/"。可以使用rabbitmqctl add_vhost命令添加虛擬主機,然後使用rabbitmqctl set_permissions命令設置指定用戶在指定虛擬主機下的權限,以此達到權限管控的目的。

b) 消息通道(channel)

消息通道:  在客戶端的每個連接裏,可建立多個channel,每個channel代表一個會話任務。

c) 交換機(exchange)

? 交換機: exchange的功能是用於消息分發,它負責接收消息並轉發到與之綁定的隊列,exchange不存儲消息,如果一個exchange沒有binding任何Queue,那麽當它會丟棄生產者發送過來的消息,在啟用ACK機制後,如果exchange找不到隊列,則會返回錯誤。一個exchange可以和多個Queue進行綁定。

交換機有四種類型:

  • 路由模式(Direct):

    ?direct 類型的行為是"先匹配, 再投送"。即在綁定時設定一個 routing_key, 消息的routing_key 匹配時, 才會被交換器投送到綁定的隊列中去。direct是rabbitmq的默認交換機類型。

  • 通配符模式(Topic):

    ?類似路由模式,但是routing_key支持模糊匹配,按規則轉發消息(最靈活)。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。

  • 發布訂閱模式(Fanout):

    ?轉發消息到所有綁定隊列,忽略routing_key。

  • Headers:

? 設置header attribute參數類型的交換機。相較於 direct 和 topic 固定地使用 routing_key , headers 則是一個自定義匹配規則的類型,忽略routing_key。在隊列與交換器綁定時, 會設定一組鍵值對規則, 消息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或全部匹配時, 消息被投送到對應隊列。
? 在綁定Queue與Exchange時指定一組鍵值對,當消息發送到RabbitMQ時會取到該消息的headers與Exchange綁定時指定的鍵值對進行匹配。如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列。headers屬性是一個鍵值對,可以是Hashtable,鍵值對的值可以是任何類型。

匹配規則x-match有下列兩種類型:

x-match = all :表示所有的鍵值對都匹配才能接受到消息
x-match = any :表示只要有鍵值對匹配就能接受到消息

2,使用spring-boot-starter-amqp操作rabbitmq

首先添加相關依賴:

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
      </dependency>

application.properties中配置rabbitmq相關配置:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=cord
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

定義rabbitmq配置類:

RabbitMQConfig.java

@Configuration
public class RabbitMQConfig {

    private static final String topicExchangeName = "topic-exchange";
    private static final String fanoutExchange = "fanout-exchange";
    private static final String headersExchange = "headers-exchange";

    private static final String queueName = "cord";

    //聲明隊列
    @Bean
    public Queue queue() {
        //Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
        return new Queue("cord", false, true, true);
    }

    //聲明Topic交換機
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(topicExchangeName);
    }

    //將隊列與Topic交換機進行綁定,並指定路由鍵
    @Bean
    Binding topicBinding(Queue queue, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with("org.cord.#");
    }

    //聲明fanout交換機
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }

    //將隊列與fanout交換機進行綁定
    @Bean
    Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    //聲明Headers交換機
    @Bean
    HeadersExchange headersExchange() {
        return new HeadersExchange(headersExchange);
    }

    //將隊列與headers交換機進行綁定
    @Bean
    Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
        Map<String, Object> map = new HashMap<>();
        map.put("First","A");
        map.put("Fourth","D");
        //whereAny表示部分匹配,whereAll表示全部匹配
//        return BindingBuilder.bind(queue).to(headersExchange).whereAll(map).match();
        return BindingBuilder.bind(queue).to(headersExchange).whereAny(map).match();
    }
}

定義生產者:

Producer.java

@Component
public class Producer {

    @Autowired
    private AmqpTemplate template;

    @Autowired
    private AmqpAdmin admin;

    /**
     * @param routingKey 路由關鍵字
     * @param msg 消息體
     */
    public void sendDirectMsg(String routingKey, String msg) {
        template.convertAndSend(routingKey, msg);
    }

    /**
     * @param routingKey 路由關鍵字
     * @param msg 消息體
     * @param exchange 交換機
     */
    public void sendExchangeMsg(String exchange, String routingKey, String msg) {
        template.convertAndSend(exchange, routingKey, msg);
    }

    /**
     * @param map 消息headers屬性
     * @param exchange 交換機
     * @param msg 消息體
     */
    public void sendHeadersMsg(String exchange, String msg, Map<String, Object> map) {
        template.convertAndSend(exchange, null, msg, message -> {
            message.getMessageProperties().getHeaders().putAll(map);
            return message;
        });
    }
}

定義消費者:

Consumer.class

@Component
public class Consumer {
  
    @RabbitListener(queues = "cord")
    //@RabbitListener(queues = "cord", containerFactory="myFactory")
    public void processMessage(String msg) {
        System.out.format("Receiving Message: -----[%s]----- \n.", msg);
    }
}

測試用例:

RabbitmqTest.java

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = CordApplication.class)
public class RabbitmqTest {

    @Autowired
    private Producer producer;

    //Direct
    @Test
    public void sendDirectMsg() {
        producer.sendDirectMsg("cord", String.valueOf(System.currentTimeMillis()));
    }

    //Topic
    @Test
    public void sendtopicMsg() {
        producer.sendExchangeMsg("topic-exchange","org.cord.test", "hello world");
    }

    //Fanout
    @Test
    public void sendFanoutMsg() {
        producer.sendExchangeMsg("fanout-exchange", "abcdefg", String.valueOf(System.currentTimeMillis()));
    }

    //Headers
    @Test
    public void sendHeadersMsg() {
        Map<String, Object> map = new HashMap<>();
        map.put("First","A");
        producer.sendHeadersMsg("headers-exchange", "hello word", map);
    }
}

https://spring.io/guides/gs/messaging-rabbitmq/

https://blog.csdn.net/qq1052441272/article/details/53940754

https://stackoverflow.com/questions/19240290/how-do-i-implement-headers-exchange-in-rabbitmq-using-java

https://blog.csdn.net/ztx114/article/details/78410727

https://www.cnblogs.com/jfl-xx/p/7324285.html

RabbiMQ基礎以及spring-boot-starter-amqp使用