微服務:MQ高階特性
死信交換機
死信
什麼是死信?
當一個佇列中的訊息滿足下列情況之一時,可以成為死信(dead letter):
- 消費者使用basic.reject或 basic.nack宣告消費失敗,並且訊息的requeue引數設定為false
- 訊息是一個過期訊息,超時無人消費
- 要投遞的佇列訊息滿了,無法投遞
如果這個包含死信的佇列配置了dead-letter-exchange
屬性,指定了一個交換機,那麼佇列中的死信就會投遞到這個交換機中,而這個交換機稱為死信交換機(Dead Letter Exchange,檢查DLX)。
如圖,一個訊息被消費者拒絕了,變成了死信:
因為simple.queue綁定了死信交換機 dl.direct,因此死信會投遞給這個交換機:
如果這個死信交換機也綁定了一個佇列,則訊息最終會進入這個存放死信的佇列:
另外,佇列將死信投遞給死信交換機時,必須知道兩個資訊:
- 死信交換機名稱
- 死信交換機與死信佇列繫結的RoutingKey
這樣才能確保投遞的訊息能到達死信交換機,並且正確的路由到死信佇列。
在失敗重試策略中,預設的RejectAndDontRequeueRecoverer會在本地重試次數耗盡後,傳送reject給RabbitMQ,訊息變成死信,被丟棄。
我們可以給simple.queue新增一個死信交換機,給死信交換機繫結一個佇列。這樣訊息變成死信後也不會丟棄,而是最終投遞到死信交換機,路由到與死信交換機繫結的佇列。
我們在consumer服務中,定義一組死信交換機、死信佇列:
// 宣告普通的 simple.queue佇列,並且為其指定死信交換機:dl.direct @Bean public Queue simpleQueue2(){ return QueueBuilder.durable("simple.queue") // 指定佇列名稱,並持久化 .deadLetterExchange("dl.direct") // 指定死信交換機 .build(); } // 宣告死信交換機 dl.direct @Bean public DirectExchange dlExchange(){ return new DirectExchange("dl.direct", true, false); } // 宣告儲存死信的佇列 dl.queue @Bean public Queue dlQueue(){ return new Queue("dl.queue", true); } // 將死信佇列 與 死信交換機繫結 @Bean public Binding dlBinding(){ return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple"); }
小結
什麼樣的訊息會成為死信?
- 訊息被消費者reject或者返回nack
- 訊息超時未消費
- 佇列滿了
死信交換機的使用場景是什麼?
- 如果佇列綁定了死信交換機,死信會投遞到死信交換機;
- 可以利用死信交換機收集所有消費者處理失敗的訊息(死信),交由人工處理,進一步提高訊息佇列的可靠性。
TTL
一個佇列中的訊息如果超時未消費,則會變為死信,超時分為兩種情況:
- 訊息所在的佇列設定了超時時間
- 訊息本身設定了超時時間
實現TTL
實現TTL
//編寫ttl.direct、ttl.queue並繫結死信交換機
@Configuration
public class TTLMessageConf {
/**
* 建立了一個綁定了死信交換機的佇列
* @return
*/
@Bean
public Queue ddlQueue(){
return QueueBuilder.durable("ttl.queue")
.ttl(10000)
.deadLetterExchange("dl.direct")
.deadLetterRoutingKey("dl")
.build();
}
/**
* 這是一個交換機
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("ttl.direct");
}
/**
* 將佇列和交換機繫結
* @return
*/
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ddlQueue()).to(directExchange()).with("ttl");
}
}
//建立私信交換機、佇列和消費者
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "dl.queue",durable = "true"),
exchange = @Exchange(name = "dl.direct",durable = "true"),
key = "dl"
)
})
public void listenDlQueue(String msg){
log.info("消費者接收到simple.queue的訊息:{}",msg);
}
在傳送訊息時,也可以指定TTL:
@Test
public void testTTLMsg() {
// 建立訊息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 訊息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 傳送訊息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("傳送訊息成功");
}
延遲佇列
利用TTL結合死信交換機,我們實現了訊息發出後,消費者延遲收到訊息的效果。這種訊息模式就稱為延遲佇列(Delay Queue)模式。
延遲佇列的使用場景包括:
- 延遲傳送簡訊
- 使用者下單,如果使用者在15 分鐘內未支付,則自動取消
- 預約工作會議,20分鐘後自動通知所有參會人員
DelayExchange原理
DelayExchange需要將一個交換機宣告為delayed型別。當我們傳送訊息到delayExchange時,流程如下:
- 接收訊息
- 判斷訊息是否具備x-delay屬性
- 如果有x-delay屬性,說明是延遲訊息,持久化到硬碟,讀取x-delay值,作為延遲時間
- 返回routing not found結果給訊息傳送者
- x-delay時間到期後,重新投遞訊息到指定佇列
使用SpirngAMQP實現DelayExchange
//宣告延遲交換機
//1.基於註解的方式宣告延遲交換機
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = ""),
exchange = @Exchange(name = "",delayed = "true"),//這裡選擇delay屬性為true
key = "hello"
)
})
public void listenDelay(){
}
//2.基於@Bean的方式宣告延遲交換機
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder.directExchange("delay")
.delayed()//使用delayed
.durable(true)
.build();
}
傳送訊息
傳送訊息時,一定要攜帶x-delay屬性,指定延遲的時間:
總結
延遲佇列外掛的使用步驟包括哪些?
•宣告一個交換機,新增delayed屬性為true
•傳送訊息時,新增x-delay頭,值為超時時間
惰性佇列
訊息堆積
當生產者傳送訊息的速度超過了消費者處理訊息的速度,就會導致佇列中的訊息堆積,直到佇列儲存訊息達到上限。之後傳送的訊息就會成為死信,可能會被丟棄,這就是訊息堆積問題。
解決訊息堆積有兩種思路:
- 增加更多消費者,提高消費速度。也就是我們之前說的work queue模式
- 擴大佇列容積,提高堆積上限
要提升佇列容積,把訊息儲存在記憶體中顯然是不行的。
惰性佇列
從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性佇列。惰性佇列的特徵如下:
- 接收到訊息後直接存入磁碟而非記憶體
- 消費者要消費訊息時才會從磁碟中讀取並載入到記憶體
- 支援數百萬條的訊息儲存
基於命令列設定lazy-queue
而要設定一個佇列為惰性佇列,只需要在宣告佇列時,指定x-queue-mode屬性為lazy即可。可以通過命令列將一個執行中的佇列修改為惰性佇列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
-
rabbitmqctl
:RabbitMQ的命令列工具 -
set_policy
:新增一個策略 -
Lazy
:策略名稱,可以自定義 -
"^lazy-queue$"
:用正則表示式匹配佇列的名字 -
'{"queue-mode":"lazy"}'
:設定佇列模式為lazy模式 -
--apply-to queues
:策略的作用物件,是所有的佇列
宣告惰性佇列
- 基於Bean的方式
@Bean
public Queue lazyQueue(){
return QueueBuilder.durable("lazyQueue")
.lazy()
.build();
}
- 基於註解的方式
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name=""),
exchange = @Exchange(name=""),
key = "hello",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
)
})
public void lazyQueue(){
}
總結
訊息堆積問題的解決方案?
- 佇列上繫結多個消費者,提高消費速度
- 使用惰性佇列,可以再mq中儲存更多訊息
惰性佇列的優點有哪些?
- 基於磁碟儲存,訊息上限高
- 沒有間歇性的page-out,效能比較穩定
惰性佇列的缺點有哪些?
- 基於磁碟儲存,訊息時效性會降低
- 效能受限於磁碟的IO
MQ叢集
叢集分類
RabbitMQ的是基於Erlang語言編寫,而Erlang又是一個面向併發的語言,天然支援叢集模式。RabbitMQ的叢集有兩種模式:
•普通叢集:是一種分散式叢集,將佇列分散到叢集的各個節點,從而提高整個叢集的併發能力。
•映象叢集:是一種主從叢集,普通叢集的基礎上,添加了主從備份功能,提高叢集的資料可用性。
映象叢集雖然支援主從,但主從同步並不是強一致的,某些情況下可能有資料丟失的風險。因此在RabbitMQ的3.8版本以後,推出了新的功能:仲裁佇列來代替映象叢集,底層採用Raft協議確保主從的資料一致性。
普通叢集
普通叢集,或者叫標準叢集(classic cluster),具備下列特徵:
- 會在叢集的各個節點間共享部分資料,包括:交換機、佇列元資訊。不包含佇列中的訊息。
- 當訪問叢集某節點時,如果佇列不在該節點,會從資料所在節點傳遞到當前節點並返回
- 佇列所在節點宕機,佇列中的訊息就會丟失
結構如圖:
普通叢集的搭建
在RabbitMQ的官方文件中,講述了兩種叢集的配置方式:
- 普通模式:普通模式叢集不進行資料同步,每個MQ都有自己的佇列、資料資訊(其它元資料資訊如交換機等會同步)。例如我們有2個MQ:mq1,和mq2,如果你的訊息在mq1,而你連線到了mq2,那麼mq2會去mq1拉取訊息,然後返回給你。如果mq1宕機,訊息就會丟失。
- 映象模式:與普通模式不同,佇列會在各個mq的映象節點之間同步,因此你連線到任何一個映象節點,均可獲取到訊息。而且如果一個節點宕機,並不會導致資料丟失。不過,這種方式增加了資料同步的頻寬消耗。
我們先來看普通模式叢集,我們的計劃部署3節點的mq叢集:
主機名 | 控制檯埠 | amqp通訊埠 |
---|---|---|
mq1 | 8081 ---> 15672 | 8071 ---> 5672 |
mq2 | 8082 ---> 15672 | 8072 ---> 5672 |
mq3 | 8083 ---> 15672 | 8073 ---> 5672 |
叢集中的節點標示預設都是:rabbit@[hostname]
,因此以上三個節點的名稱分別為:
- rabbit@mq1
- rabbit@mq2
- rabbit@mq3
獲取cookie
RabbitMQ底層依賴於Erlang,而Erlang虛擬機器就是一個面向分散式的語言,預設就支援叢集模式。叢集模式中的每個RabbitMQ 節點使用 cookie 來確定它們是否被允許相互通訊。
要使兩個節點能夠通訊,它們必須具有相同的共享祕密,稱為Erlang cookie。cookie 只是一串最多 255 個字元的字母數字字元。
每個叢集節點必須具有相同的 cookie。例項之間也需要它來相互通訊。
我們先在之前啟動的mq容器中獲取一個cookie值,作為叢集的cookie。執行下面的命令:
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
可以看到cookie值如下:
ABSDWSBQLMYQZPLNPILH
接下來,停止並刪除當前的mq容器,我們重新搭建叢集。
docker rm -f mq
準備叢集配置
在/tmp目錄新建一個配置檔案 rabbitmq.conf:
cd /tmp
# 建立檔案
touch rabbitmq.conf
檔案內容如下:
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
再建立一個檔案,記錄cookie
cd /tmp
# 建立cookie檔案
touch .erlang.cookie
# 寫入cookie
echo "ABSDWSBQLMYQZPLNPILH" > .erlang.cookie
# 修改cookie檔案的許可權
chmod 600 .erlang.cookie
準備三個目錄,mq1、mq2、mq3:
cd /tmp
# 建立目錄
mkdir mq1 mq2 mq3
然後拷貝rabbitmq.conf、cookie檔案到mq1、mq2、mq3:
# 進入/tmp
cd /tmp
# 拷貝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
啟動叢集
建立一個網路:
docker network create mq-net
執行命令
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management
映象叢集
在剛剛的案例中,一旦建立佇列的主機宕機,佇列就會不可用。不具備高可用能力。如果要解決這個問題,必須使用官方提供的映象叢集方案。
官方文件地址:https://www.rabbitmq.com/ha.html
映象模式的特徵
預設情況下,佇列只儲存在建立該佇列的節點上。而映象模式下,建立佇列的節點被稱為該佇列的主節點,佇列還會拷貝到叢集中的其它節點,也叫做該佇列的映象節點。
但是,不同佇列可以在叢集中的任意節點上建立,因此不同佇列的主節點可以不同。甚至,一個佇列的主節點可能是另一個佇列的映象節點。
使用者傳送給佇列的一切請求,例如傳送訊息、訊息回執預設都會在主節點完成,如果是從節點接收到請求,也會路由到主節點去完成。映象節點僅僅起到備份資料作用。
當主節點接收到消費者的ACK時,所有映象都會刪除節點中的資料。
總結如下:
- 映象佇列結構是一主多從(從就是映象)
- 所有操作都是主節點完成,然後同步給映象節點
- 主宕機後,映象節點會替代成新的主(如果在主從同步完成前,主就已經宕機,可能出現數據丟失)
- 不具備負載均衡功能,因為所有操作都會有主節點完成(但是不同佇列,其主節點可以不同,可以利用這個提高吞吐量)
映象叢集的搭建
映象模式的配置有3種模式:
ha-mode | ha-params | 效果 |
---|---|---|
準確模式exactly | 佇列的副本量count | 叢集中佇列副本(主伺服器和映象伺服器之和)的數量。count如果為1意味著單個副本:即佇列主節點。count值為2表示2個副本:1個佇列主和1個佇列映象。換句話說:count = 映象數量 + 1。如果群集中的節點數少於count,則該佇列將映象到所有節點。如果有叢集總數大於count+1,並且包含映象的節點出現故障,則將在另一個節點上建立一個新的映象。 |
all | (none) | 佇列在群集中的所有節點之間進行映象。佇列將映象到任何新加入的節點。映象到所有節點將對所有群集節點施加額外的壓力,包括網路I / O,磁碟I / O和磁碟空間使用情況。推薦使用exactly,設定副本數為(N / 2 +1)。 |
nodes | node names | 指定佇列建立到哪些節點,如果指定的節點全部不存在,則會出現異常。如果指定的節點在叢集中存在,但是暫時不可用,會建立節點到當前客戶端連線到的節點。 |
這裡我們以rabbitmqctl命令作為案例來講解配置語法。
語法示例:
exactly模式
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
-
rabbitmqctl set_policy
:固定寫法 -
ha-two
:策略名稱,自定義 -
"^two\."
:匹配佇列的正則表示式,符合命名規則的佇列才生效,這裡是任何以two.
開頭的佇列名稱 -
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
: 策略內容-
"ha-mode":"exactly"
:策略模式,此處是exactly模式,指定副本數量 -
"ha-params":2
:策略引數,這裡是2,就是副本數量為2,1主1映象 -
"ha-sync-mode":"automatic"
:同步策略,預設是manual,即新加入的映象節點不會同步舊的訊息。如果設定為automatic,則新加入的映象節點會把主節點中所有訊息都同步,會帶來額外的網路開銷
-
all模式
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
-
ha-all
:策略名稱,自定義 -
"^all\."
:匹配所有以all.
開頭的佇列名 -
'{"ha-mode":"all"}'
:策略內容-
"ha-mode":"all"
:策略模式,此處是all模式,即所有節點都會稱為映象節點
-
nodes模式
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
-
rabbitmqctl set_policy
:固定寫法 -
ha-nodes
:策略名稱,自定義 -
"^nodes\."
:匹配佇列的正則表示式,符合命名規則的佇列才生效,這裡是任何以nodes.
開頭的佇列名稱 -
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
: 策略內容-
"ha-mode":"nodes"
:策略模式,此處是nodes模式 -
"ha-params":["rabbit@mq1", "rabbit@mq2"]
:策略引數,這裡指定副本所在節點名稱
-
仲裁佇列
從RabbitMQ 3.8版本開始,引入了新的仲裁佇列,他具備與映象隊裡類似的功能,但使用更加方便。
叢集特徵
仲裁佇列:仲裁佇列是3.8版本以後才有的新功能,用來替代映象佇列,具備下列特徵:
- 與映象佇列一樣,都是主從模式,支援主從資料同步
- 使用非常簡單,沒有複雜的配置
- 主從同步基於Raft協議,強一致
仲裁佇列搭建
在任意控制檯新增一個佇列,一定要選擇佇列型別為Quorum型別。
在任意控制檯檢視佇列:
可以看到,仲裁佇列的 + 2字樣。代表這個佇列有2個映象節點。
因為仲裁佇列預設的映象數為5。如果你的叢集有7個節點,那麼映象數肯定是5;而我們叢集只有3個節點,因此映象數量就是3.
Java程式碼建立仲裁佇列
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁佇列
.build();
}
SpringAMQP連線MQ叢集
注意,這裡用address來代替host、port方式
spring:
rabbitmq:
addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
username: itcast
password: 123321
virtual-host: /