1. 程式人生 > >分散式訊息通訊RabbitMQ工作模型

分散式訊息通訊RabbitMQ工作模型

本章重點:

1.三種主要的交換機介紹

2.SpringBoot整合RabbitMQ三種交換機

3.死信佇列

4.優先順序佇列和訊息

5.服務端流控

6.消費端限流

RabbitMQ的特性 
RabbitMQ使用Erlang語言編寫,使用Mnesia資料庫儲存訊息。

1.可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。 
2.靈活的路由(Flexible Routing) 在訊息進入佇列之前,通過 Exchange 來路由訊息的。對於典型的路由功 能,RabbitMQ 已經提供了一些內建的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 繫結在 一起,也通過外掛機制實現自己的 Exchange 。
3.訊息叢集(Clustering) 多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。 
4.高可用(Highly Available Queues) 佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下 佇列仍然可用。
5.多種協議(Multi-protocol) RabbitMQ 支援多種訊息佇列協議,比如 AMQP、STOMP、MQTT 等等。
6.多語言客戶端(Many Clients) RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby、PHP、C#、 JavaScript 等等。 
7.管理介面(Management UI) RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息、叢集 中的節點。
8.外掛機制(Plugin System) RabbitMQ提供了許多外掛,以實現從多方面擴充套件,當然也可以編寫自己的外掛。

工作模型

RabbitMQ的術語介紹

Broker:即RabbitMQ的實體伺服器。提供一種傳輸服務,維護一條從生產者到消費者的傳輸線路, 保證訊息資料能按照指定的方式傳輸。
Exchange:訊息交換機。指定訊息按照什麼規則路由到哪個佇列Queue。
Queue:訊息佇列。訊息的載體,每條訊息都會被投送到一個或多個佇列中。
Binding:繫結。作用就是將Exchange和Queue按照某種路由規則繫結起來。
Routing Key:路由關鍵字。Exchange根據Routing Key進行訊息投遞。定義繫結時指定的關鍵字稱為 Binding Key。
Vhost:虛擬主機。一個Broker可以有多個虛擬主機,用作不同使用者的許可權分離。一個虛擬主機持有 一組Exchange、Queue和Binding。
Producer:訊息生產者。主要將訊息投遞到對應的Exchange上面。一般是獨立的程式。
Consumer:訊息消費者。訊息的接收者,一般是獨立的程式。
Connection: Producer 和 Consumer 與Broker之間的TCP長連線。
Channel:訊息通道,也稱通道。在客戶端的每個連線裡可以建立多個Channel,每個Channel代表一 個會話任務。在RabbitMQ Java Client API中,channel上定義了大量的程式設計介面。

三種主要的交換機

  1. Direct Exchange 直連交換機 
  2. Topic Exchange 主題交換機 
  3. Fanout Exchange 廣播交換機 

RabbitMQ基本使用:

本文直接使用SpringBoot來進行開發.

訊息消費者:

專案結構:

新增pom依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

新增連線配置  application.properties

spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

建立配置類:RabbitConfig

在這裡建立了三種類型的交換機,四個佇列

package com.zbb.rabbitmq_consumer.config;
 
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * 配置類,需要使用configuration註解
 */
@Configuration
public class RabbitConfig {
    //定義三個交換機
 
    /**
     * 直連交換機
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("DIRECT_EXCHANGE");
    }
 
    /**
     * 主題交換機
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("TOPIC_EXCHANGE");
    }
 
    /**
     * 廣播交換機
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("FANOUT_EXCHANGE");
    }
    //定義四個佇列
    @Bean
    public Queue firsQueue(){
        return new Queue("FIST_QUEUE");
    }
 
    @Bean
    public Queue secondQueue(){
        return new Queue("SECOND_QUEUE");
    }
 
    @Bean
    public Queue thirdQueue(){
        return new Queue("THIRD_QUEUE");
    }
 
    @Bean
    public Queue fourthQueue(){
        return new Queue("FOURTH_QUEUE");
    }
 
    //定義四個繫結關係
    @Bean
    public Binding bindFirst(@Qualifier("firsQueue") Queue queue,
                             @Qualifier("directExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("zbb.test");
    }
 
    @Bean
    public Binding bindSecond(@Qualifier("secondQueue") Queue queue,
                              @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with("*.zbb.*");
    }
 
    @Bean
    public Binding bindThird(@Qualifier("thirdQueue") Queue queue,
                             @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
 
    @Bean
    public Binding bindFourth(@Qualifier("fourthQueue") Queue queue,
                              @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
 
}

建立消費者,四個,分別監聽4個佇列

1、FirstConsumer
 

package com.zbb.rabbitmq_consumer.consumer;
 
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
@RabbitListener(queues = "FIST_QUEUE")
public class FirstConsumer {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("消費者一收到訊息:"+msg);
    }
}

2、SecondConsumer
 

package com.zbb.rabbitmq_consumer.consumer;
 
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
@RabbitListener(queues = "SECOND_QUEUE")
public class SecondConsumer {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("消費者二收到訊息:"+msg);
    }
}

3、ThirdConsumer

package com.zbb.rabbitmq_consumer.consumer;
 
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
@RabbitListener(queues = "THIRD_QUEUE")
public class ThirdConsumer {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("消費者三收到訊息:"+msg);
    }
}

4、FourthConsumer
 

package com.zbb.rabbitmq_consumer.consumer;
 
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
@RabbitListener(queues = "FOURTH_QUEUE")
public class FourthConsumer {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("消費者四收到訊息:"+msg);
    }
}

訊息生產者:

引入pom依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

建立生產者類,給三個交換機發送訊息

package com.zbb.rabbitmq_producer.producer;
 
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
@Component
public class MyProducer {
 
    @Autowired
    RabbitTemplate rabbitTemplate;
 
    public void send(){
        //直連
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "zbb.test",
                "this is a direct msg");
        //主題
        rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","kaifa.zbb.IT", 
                "this is a Topic msg");
 
        //廣播
        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "",
                "this is a fanout msg");
    }
}

修改測試類:

package com.zbb.rabbitmq_producer;
 
import com.zbb.rabbitmq_producer.producer.MyProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
 
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqProducerApplicationTests {
 
    @Autowired
    MyProducer myProducer;
 
    @Test
    public void contextLoads() {
        myProducer.send();
    }
 
}
 
  1. 我們先啟動訊息生產者,開啟管理介面可以看到四個佇列裡都有一條訊息堆積

    2.現在去啟動訊息消費者之後會發現控制檯打印出訊息,並且管理介面發生改變:

這個就是RabbitMQ在java中的簡單使用,具體在專案中的使用需要結合業務來進行訊息的收發。

下面將介紹一下RabbitMQ中一個重要的東西:交換機Exchange

這裡只介紹三種主要的交換機。

Direct Exchange 直連交換機

直連型別的交換機與一個佇列繫結時,需要指定一個明確的繫結關鍵字(binding key)。
路由規則:傳送訊息到直連型別的交換機時,只有路由關鍵字(routing key)跟繫結關鍵字(binding key)完全匹配時,繫結的佇列才能收到訊息。

佇列:

@Bean
public Queue firsQueue(){
    return new Queue("FIST_QUEUE");
}
交換機:

@Bean
public DirectExchange directExchange(){
    return new DirectExchange("DIRECT_EXCHANGE");
}
繫結:

@Bean
public Binding bindFirst(@Qualifier("firsQueue") Queue queue,
                         @Qualifier("directExchange") DirectExchange directExchange){
    return BindingBuilder.bind(queue).to(directExchange).with("zbb.test");
}
傳送訊息:

rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "zbb.test",
                "this is a direct msg");


Topic Exchange 主題交換機

定義:主題型別的交換機與一個佇列繫結時,可以指定按模式匹配的routing key。
萬用字元有兩個,*代表匹配一個單詞。#代表匹配零個或者多個單詞。單詞與單詞之間用 . 隔開。
路由規則:傳送訊息到主題型別的交換機時,routing key符合binding key的模式時,繫結的佇列才能收到訊息

佇列:

@Bean
public Queue secondQueue(){
    return new Queue("SECOND_QUEUE");
}
交換機:

@Bean
public TopicExchange topicExchange(){
    return new TopicExchange("TOPIC_EXCHANGE");
}
繫結:

@Bean
public Binding bindSecond(@Qualifier("secondQueue") Queue queue,
                          @Qualifier("topicExchange") TopicExchange topicExchange) {
    return BindingBuilder.bind(queue).to(topicExchange).with("*.zbb.*");
}
傳送訊息:

//主題
rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","kaifa.zbb.IT",
                              "this is a Topic msg");


Fanout Exchange 廣播交換機,繫結兩個佇列thirdQueue   fourthQueue

定義:廣播型別的交換機與一個佇列繫結時,不需要指定binding key。
路由規則:當訊息傳送到廣播型別的交換機時,不需要指定routing key,所有與之繫結的佇列都能收到訊息。

佇列:

@Bean
public Queue thirdQueue(){
    return new Queue("THIRD_QUEUE");
}
@Bean
public Queue fourthQueue(){
    return new Queue("FOURTH_QUEUE");
}
交換機:

@Bean
public FanoutExchange fanoutExchange(){
    return new FanoutExchange("FANOUT_EXCHANGE");
}
傳送訊息:

//廣播
rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "",
                "this is a fanout msg");

進階知識 1、TTL(Time To Live) a、訊息的過期時間 
有兩種設定方式:
通過佇列屬性設定訊息過期時間:

Map<String, Object> argss = new HashMap<String, Object>(); 
                argss.put("x-message-ttl",6000);
 
                channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);

設定單條訊息的過期時間:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                                 // 持久化訊息  
                                 .deliveryMode
                                 .contentEncoding("UTF-8")       
                                 .expiration("10000") // TTL        
                                 .build();
channel.basicPublish("", "TEST_TTL_QUEUE", properties, msg.getBytes());

b、佇列的過期時間:

Map<String, Object> argss = new HashMap<String, Object>(); 
argss.put("x-message-ttl",6000);
 
channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);

佇列的過期時間決定了在沒有任何消費者以後,佇列可以存活多久。

2、死信佇列 
有三種情況訊息會進入DLX(Dead Letter Exchange)死信交換機。

  1. (NACK || Reject ) && requeue == false
  2. 訊息過期
  3. 佇列達到最大長度(先入隊的訊息會被髮送到DLX)

可以設定一個死信佇列(Dead Letter Queue)與DLX繫結,即可以儲存Dead Letter,消費者可以監聽這個佇列取走訊息。

Map<String,Object> arguments = new HashMap<String,Object>(); 
arguments.put("x-dead-letter-exchange","DLX_EXCHANGE"); 
// 指定了這個佇列的死信交換機 
channel.queueDeclare("TEST_DLX_QUEUE", false, false, false, arguments); 
// 宣告死信交換機 
channel.exchangeDeclare("DLX_EXCHANGE","topic", false, false, false, null); 
// 宣告死信佇列 
channel.queueDeclare("DLX_QUEUE", false, false, false, null); 
// 繫結 
channel.queueBind("DLX_QUEUE","DLX_EXCHANGE","#");

3、優先順序佇列 
設定一個佇列的最大優先順序:

Map<String, Object> argss = new HashMap<String, Object>(); 
argss.put("x-max-priority",10);  
// 佇列最大優先順序
channel.queueDeclare("ORIGIN_QUEUE", false, false, false, argss);

傳送訊息時指定訊息當前的優先順序:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()        
.priority(5)  
// 訊息優先順序        
.build();
channel.basicPublish("", "ORIGIN_QUEUE", properties, msg.getBytes());

優先順序高的訊息可以優先被消費,但是:只有訊息堆積(訊息的傳送速度大於消費者的消費速度)的情況下優先順序 才有意義。
4、延遲佇列 
RabbitMQ本身不支援延遲佇列。可以使用TTL結合DLX的方式來實現訊息的延遲投遞,即把DLX跟某個佇列繫結, 到了指定時間,訊息過期後,就會從DLX路由到這個佇列,消費者可以從這個佇列取走訊息。 另一種方式是使用rabbitmq-delayed-message-exchange外掛。
當然,將需要傳送的資訊儲存在資料庫,使用任務排程系統掃描然後傳送也是可以實現的。

5、RPC 
RabbitMQ實現RPC的原理:服務端處理訊息後,把響應訊息傳送到一個響應佇列,客戶端再從響應佇列取到結 果。
其中的問題:Client收到訊息後,怎麼知道應答訊息是回覆哪一條訊息的?所以必須有一個唯一ID來關聯,就是 correlationId。


6、服務端流控(Flow Control) 
 

RabbitMQ 會在啟動時檢測機器的實體記憶體數值。預設當 MQ 佔用 40% 以上記憶體時,MQ 會主動丟擲一個記憶體警告並阻塞所有連線(Connections)。

可以通過修改 rabbitmq.config 檔案來調整記憶體閾值,預設值是 0.4,如下 所示:   [{rabbit, [{vm_memory_high_watermark, 0.4}]}]. 預設情況,如果剩餘磁碟空間在 1GB 以下,RabbitMQ 主動阻塞所有的生產者。這個閾值也是可調的。
注意佇列長度只在訊息堆積的情況下有意義,而且會刪除先入隊的訊息,不能實現服務端限流。

7、消費端限流 
在AutoACK為false的情況下,如果一定數目的訊息(通過基於consumer或者channel設定Qos的值)未被確認 前,不進行消費新的訊息。
 

channel.basicQos(2); 
// 如果超過2條訊息沒有傳送ACK,當前消費者不再接受佇列訊息 
channel.basicConsume(QUEUE_NAME, false, consumer);

UI管理介面的使用 
管理外掛提供了更簡單的管理方式。 啟用管理外掛 
Windows啟用管理外掛 

cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6\sbin 
rabbitmq-plugins.bat enable rabbitmq_management

Linux啟用管理外掛 

cd /usr/lib/rabbitmq/bin 
./rabbitmq-plugins enable rabbitmq_management

管理介面訪問埠:

預設埠是15672,預設使用者guest,密碼guest。guest使用者預設只能在本機訪問。