1. 程式人生 > 其它 >微服務:MQ高階特性

微服務:MQ高階特性

死信交換機

死信

什麼是死信?

當一個佇列中的訊息滿足下列情況之一時,可以成為死信(dead letter):

  • 消費者使用basic.reject或 basic.nack宣告消費失敗,並且訊息的requeue引數設定為false
  • 訊息是一個過期訊息,超時無人消費
  • 要投遞的佇列訊息滿了,無法投遞

如果這個包含死信的佇列配置了dead-letter-exchange屬性,指定了一個交換機,那麼佇列中的死信就會投遞到這個交換機中,而這個交換機稱為死信交換機(Dead Letter Exchange,檢查DLX)。

如圖,一個訊息被消費者拒絕了,變成了死信:

因為simple.queue綁定了死信交換機 dl.direct,因此死信會投遞給這個交換機:

如果這個死信交換機也綁定了一個佇列,則訊息最終會進入這個存放死信的佇列:

另外,佇列將死信投遞給死信交換機時,必須知道兩個資訊:

  • 死信交換機名稱
  • 死信交換機與死信佇列繫結的RoutingKey

這樣才能確保投遞的訊息能到達死信交換機,並且正確的路由到死信佇列。

在失敗重試策略中,預設的RejectAndDontRequeueRecoverer會在本地重試次數耗盡後,傳送reject給RabbitMQ,訊息變成死信,被丟棄。

我們可以給simple.queue新增一個死信交換機,給死信交換機繫結一個佇列。這樣訊息變成死信後也不會丟棄,而是最終投遞到死信交換機,路由到與死信交換機繫結的佇列。

我們在consumer服務中,定義一組死信交換機、死信佇列:

// 宣告普通的 simple.queue佇列,並且為其指定死信交換機:dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定佇列名稱,並持久化
        .deadLetterExchange("dl.direct") // 指定死信交換機
        .build();
}
// 宣告死信交換機 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 宣告儲存死信的佇列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 將死信佇列 與 死信交換機繫結
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

小結

什麼樣的訊息會成為死信?

  • 訊息被消費者reject或者返回nack
  • 訊息超時未消費
  • 佇列滿了

死信交換機的使用場景是什麼?

  • 如果佇列綁定了死信交換機,死信會投遞到死信交換機;
  • 可以利用死信交換機收集所有消費者處理失敗的訊息(死信),交由人工處理,進一步提高訊息佇列的可靠性。

TTL

一個佇列中的訊息如果超時未消費,則會變為死信,超時分為兩種情況:

  • 訊息所在的佇列設定了超時時間
  • 訊息本身設定了超時時間

實現TTL

實現TTL

//編寫ttl.direct、ttl.queue並繫結死信交換機
@Configuration
public class TTLMessageConf {
    /**
     * 建立了一個綁定了死信交換機的佇列
     * @return
     */
    @Bean
    public Queue ddlQueue(){
        return QueueBuilder.durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }

    /**
     * 這是一個交換機
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("ttl.direct");
    }

    /**
     * 將佇列和交換機繫結
     * @return
     */
    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ddlQueue()).to(directExchange()).with("ttl");
    }
}
//建立私信交換機、佇列和消費者
@RabbitListener(bindings = {
    @QueueBinding(
        value = @Queue(name = "dl.queue",durable = "true"),
        exchange = @Exchange(name = "dl.direct",durable = "true"),
        key = "dl"
    )
})
public void listenDlQueue(String msg){
    log.info("消費者接收到simple.queue的訊息:{}",msg);
}

在傳送訊息時,也可以指定TTL:

@Test
public void testTTLMsg() {
    // 建立訊息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();
    // 訊息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 傳送訊息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("傳送訊息成功");
}

延遲佇列

利用TTL結合死信交換機,我們實現了訊息發出後,消費者延遲收到訊息的效果。這種訊息模式就稱為延遲佇列(Delay Queue)模式。

延遲佇列的使用場景包括:

  • 延遲傳送簡訊
  • 使用者下單,如果使用者在15 分鐘內未支付,則自動取消
  • 預約工作會議,20分鐘後自動通知所有參會人員

DelayExchange原理

DelayExchange需要將一個交換機宣告為delayed型別。當我們傳送訊息到delayExchange時,流程如下:

  • 接收訊息
  • 判斷訊息是否具備x-delay屬性
  • 如果有x-delay屬性,說明是延遲訊息,持久化到硬碟,讀取x-delay值,作為延遲時間
  • 返回routing not found結果給訊息傳送者
  • x-delay時間到期後,重新投遞訊息到指定佇列

使用SpirngAMQP實現DelayExchange

//宣告延遲交換機
//1.基於註解的方式宣告延遲交換機
@RabbitListener(bindings = {
    @QueueBinding(
        value = @Queue(name = ""),
        exchange = @Exchange(name = "",delayed = "true"),//這裡選擇delay屬性為true
        key = "hello"
    )
})
public void listenDelay(){

}
//2.基於@Bean的方式宣告延遲交換機
@Bean
public DirectExchange delayExchange(){
    return ExchangeBuilder.directExchange("delay")
        .delayed()//使用delayed
        .durable(true)
        .build();
}

傳送訊息

傳送訊息時,一定要攜帶x-delay屬性,指定延遲的時間:

總結

延遲佇列外掛的使用步驟包括哪些?

•宣告一個交換機,新增delayed屬性為true

•傳送訊息時,新增x-delay頭,值為超時時間


惰性佇列

訊息堆積

當生產者傳送訊息的速度超過了消費者處理訊息的速度,就會導致佇列中的訊息堆積,直到佇列儲存訊息達到上限。之後傳送的訊息就會成為死信,可能會被丟棄,這就是訊息堆積問題。

解決訊息堆積有兩種思路:

  • 增加更多消費者,提高消費速度。也就是我們之前說的work queue模式
  • 擴大佇列容積,提高堆積上限

要提升佇列容積,把訊息儲存在記憶體中顯然是不行的。

惰性佇列

從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性佇列。惰性佇列的特徵如下:

  • 接收到訊息後直接存入磁碟而非記憶體
  • 消費者要消費訊息時才會從磁碟中讀取並載入到記憶體
  • 支援數百萬條的訊息儲存

基於命令列設定lazy-queue

而要設定一個佇列為惰性佇列,只需要在宣告佇列時,指定x-queue-mode屬性為lazy即可。可以通過命令列將一個執行中的佇列修改為惰性佇列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解讀:

  • rabbitmqctl :RabbitMQ的命令列工具
  • set_policy :新增一個策略
  • Lazy :策略名稱,可以自定義
  • "^lazy-queue$" :用正則表示式匹配佇列的名字
  • '{"queue-mode":"lazy"}' :設定佇列模式為lazy模式
  • --apply-to queues :策略的作用物件,是所有的佇列

宣告惰性佇列

  • 基於Bean的方式
@Bean
public Queue lazyQueue(){
    return QueueBuilder.durable("lazyQueue")
            .lazy()
            .build();
}
  • 基於註解的方式
@RabbitListener(bindings = {
    @QueueBinding(
        value = @Queue(name=""),
        exchange = @Exchange(name=""),
        key = "hello",
        arguments = @Argument(name = "x-queue-mode",value = "lazy")
    )
})
public void lazyQueue(){

}

總結

訊息堆積問題的解決方案?

  • 佇列上繫結多個消費者,提高消費速度
  • 使用惰性佇列,可以再mq中儲存更多訊息

惰性佇列的優點有哪些?

  • 基於磁碟儲存,訊息上限高
  • 沒有間歇性的page-out,效能比較穩定

惰性佇列的缺點有哪些?

  • 基於磁碟儲存,訊息時效性會降低
  • 效能受限於磁碟的IO

MQ叢集

叢集分類

RabbitMQ的是基於Erlang語言編寫,而Erlang又是一個面向併發的語言,天然支援叢集模式。RabbitMQ的叢集有兩種模式:

普通叢集:是一種分散式叢集,將佇列分散到叢集的各個節點,從而提高整個叢集的併發能力。

映象叢集:是一種主從叢集,普通叢集的基礎上,添加了主從備份功能,提高叢集的資料可用性。

映象叢集雖然支援主從,但主從同步並不是強一致的,某些情況下可能有資料丟失的風險。因此在RabbitMQ的3.8版本以後,推出了新的功能:仲裁佇列來代替映象叢集,底層採用Raft協議確保主從的資料一致性。

普通叢集

普通叢集,或者叫標準叢集(classic cluster),具備下列特徵:

  • 會在叢集的各個節點間共享部分資料,包括:交換機、佇列元資訊。不包含佇列中的訊息。
  • 當訪問叢集某節點時,如果佇列不在該節點,會從資料所在節點傳遞到當前節點並返回
  • 佇列所在節點宕機,佇列中的訊息就會丟失

結構如圖:

普通叢集的搭建

在RabbitMQ的官方文件中,講述了兩種叢集的配置方式:

  • 普通模式:普通模式叢集不進行資料同步,每個MQ都有自己的佇列、資料資訊(其它元資料資訊如交換機等會同步)。例如我們有2個MQ:mq1,和mq2,如果你的訊息在mq1,而你連線到了mq2,那麼mq2會去mq1拉取訊息,然後返回給你。如果mq1宕機,訊息就會丟失。
  • 映象模式:與普通模式不同,佇列會在各個mq的映象節點之間同步,因此你連線到任何一個映象節點,均可獲取到訊息。而且如果一個節點宕機,並不會導致資料丟失。不過,這種方式增加了資料同步的頻寬消耗。

我們先來看普通模式叢集,我們的計劃部署3節點的mq叢集:

主機名 控制檯埠 amqp通訊埠
mq1 8081 ---> 15672 8071 ---> 5672
mq2 8082 ---> 15672 8072 ---> 5672
mq3 8083 ---> 15672 8073 ---> 5672

叢集中的節點標示預設都是:rabbit@[hostname],因此以上三個節點的名稱分別為:

  • rabbit@mq1
  • rabbit@mq2
  • rabbit@mq3

獲取cookie

RabbitMQ底層依賴於Erlang,而Erlang虛擬機器就是一個面向分散式的語言,預設就支援叢集模式。叢集模式中的每個RabbitMQ 節點使用 cookie 來確定它們是否被允許相互通訊。

要使兩個節點能夠通訊,它們必須具有相同的共享祕密,稱為Erlang cookie。cookie 只是一串最多 255 個字元的字母數字字元。

每個叢集節點必須具有相同的 cookie。例項之間也需要它來相互通訊。

我們先在之前啟動的mq容器中獲取一個cookie值,作為叢集的cookie。執行下面的命令:

docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

可以看到cookie值如下:

ABSDWSBQLMYQZPLNPILH

接下來,停止並刪除當前的mq容器,我們重新搭建叢集。

docker rm -f mq

準備叢集配置

在/tmp目錄新建一個配置檔案 rabbitmq.conf:

cd /tmp
# 建立檔案
touch rabbitmq.conf

檔案內容如下:

loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

再建立一個檔案,記錄cookie

cd /tmp
# 建立cookie檔案
touch .erlang.cookie
# 寫入cookie
echo "ABSDWSBQLMYQZPLNPILH" > .erlang.cookie
# 修改cookie檔案的許可權
chmod 600 .erlang.cookie

準備三個目錄,mq1、mq2、mq3:

cd /tmp
# 建立目錄
mkdir mq1 mq2 mq3

然後拷貝rabbitmq.conf、cookie檔案到mq1、mq2、mq3:

# 進入/tmp
cd /tmp
# 拷貝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3

啟動叢集

建立一個網路:

docker network create mq-net

執行命令

docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management

映象叢集

在剛剛的案例中,一旦建立佇列的主機宕機,佇列就會不可用。不具備高可用能力。如果要解決這個問題,必須使用官方提供的映象叢集方案。

官方文件地址:https://www.rabbitmq.com/ha.html

映象模式的特徵

預設情況下,佇列只儲存在建立該佇列的節點上。而映象模式下,建立佇列的節點被稱為該佇列的主節點,佇列還會拷貝到叢集中的其它節點,也叫做該佇列的映象節點。

但是,不同佇列可以在叢集中的任意節點上建立,因此不同佇列的主節點可以不同。甚至,一個佇列的主節點可能是另一個佇列的映象節點

使用者傳送給佇列的一切請求,例如傳送訊息、訊息回執預設都會在主節點完成,如果是從節點接收到請求,也會路由到主節點去完成。映象節點僅僅起到備份資料作用

當主節點接收到消費者的ACK時,所有映象都會刪除節點中的資料。

總結如下:

  • 映象佇列結構是一主多從(從就是映象)
  • 所有操作都是主節點完成,然後同步給映象節點
  • 主宕機後,映象節點會替代成新的主(如果在主從同步完成前,主就已經宕機,可能出現數據丟失)
  • 不具備負載均衡功能,因為所有操作都會有主節點完成(但是不同佇列,其主節點可以不同,可以利用這個提高吞吐量)

映象叢集的搭建

映象模式的配置有3種模式:

ha-mode ha-params 效果
準確模式exactly 佇列的副本量count 叢集中佇列副本(主伺服器和映象伺服器之和)的數量。count如果為1意味著單個副本:即佇列主節點。count值為2表示2個副本:1個佇列主和1個佇列映象。換句話說:count = 映象數量 + 1。如果群集中的節點數少於count,則該佇列將映象到所有節點。如果有叢集總數大於count+1,並且包含映象的節點出現故障,則將在另一個節點上建立一個新的映象。
all (none) 佇列在群集中的所有節點之間進行映象。佇列將映象到任何新加入的節點。映象到所有節點將對所有群集節點施加額外的壓力,包括網路I / O,磁碟I / O和磁碟空間使用情況。推薦使用exactly,設定副本數為(N / 2 +1)。
nodes node names 指定佇列建立到哪些節點,如果指定的節點全部不存在,則會出現異常。如果指定的節點在叢集中存在,但是暫時不可用,會建立節點到當前客戶端連線到的節點。

這裡我們以rabbitmqctl命令作為案例來講解配置語法。

語法示例:

exactly模式

rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • rabbitmqctl set_policy:固定寫法
  • ha-two:策略名稱,自定義
  • "^two\.":匹配佇列的正則表示式,符合命名規則的佇列才生效,這裡是任何以two.開頭的佇列名稱
  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略內容
    • "ha-mode":"exactly":策略模式,此處是exactly模式,指定副本數量
    • "ha-params":2:策略引數,這裡是2,就是副本數量為2,1主1映象
    • "ha-sync-mode":"automatic":同步策略,預設是manual,即新加入的映象節點不會同步舊的訊息。如果設定為automatic,則新加入的映象節點會把主節點中所有訊息都同步,會帶來額外的網路開銷

all模式

rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
  • ha-all:策略名稱,自定義
  • "^all\.":匹配所有以all.開頭的佇列名
  • '{"ha-mode":"all"}':策略內容
    • "ha-mode":"all":策略模式,此處是all模式,即所有節點都會稱為映象節點

nodes模式

rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
  • rabbitmqctl set_policy:固定寫法
  • ha-nodes:策略名稱,自定義
  • "^nodes\.":匹配佇列的正則表示式,符合命名規則的佇列才生效,這裡是任何以nodes.開頭的佇列名稱
  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略內容
    • "ha-mode":"nodes":策略模式,此處是nodes模式
    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略引數,這裡指定副本所在節點名稱

仲裁佇列

從RabbitMQ 3.8版本開始,引入了新的仲裁佇列,他具備與映象隊裡類似的功能,但使用更加方便。

叢集特徵

仲裁佇列:仲裁佇列是3.8版本以後才有的新功能,用來替代映象佇列,具備下列特徵:

  • 與映象佇列一樣,都是主從模式,支援主從資料同步
  • 使用非常簡單,沒有複雜的配置
  • 主從同步基於Raft協議,強一致

仲裁佇列搭建

在任意控制檯新增一個佇列,一定要選擇佇列型別為Quorum型別。

在任意控制檯檢視佇列:

可以看到,仲裁佇列的 + 2字樣。代表這個佇列有2個映象節點。

因為仲裁佇列預設的映象數為5。如果你的叢集有7個節點,那麼映象數肯定是5;而我們叢集只有3個節點,因此映象數量就是3.

Java程式碼建立仲裁佇列

@Bean
public Queue quorumQueue() {
    return QueueBuilder
        .durable("quorum.queue") // 持久化
        .quorum() // 仲裁佇列
        .build();
}

SpringAMQP連線MQ叢集

注意,這裡用address來代替host、port方式

spring:
  rabbitmq:
    addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
    username: itcast
    password: 123321
    virtual-host: /