1. 程式人生 > 其它 >死信訊息和死信佇列

死信訊息和死信佇列

死信訊息和死信佇列定義

Dead Letter Exchange 死信佇列(DLX)佇列的簡稱。

另外對於死信訊息:通常如果我們的一個訊息存在以下的情況下的話則這訊息被稱為死信訊息:

  • 1: 訊息被消費端拒絕,使用 channel.basicNackchannel.basicReject ,並且此時requeue 屬性被設定為false

  • 2: 訊息在佇列的存活時間超過設定的TTL時間

  • 3:訊息佇列的訊息數量已經超過最大佇列長度,無法再繼續新增訊息到MQ中

  • 4:一個佇列中的訊息的TTL對其他佇列中同一條訊息的TTL沒有影響

對於死信訊息的處理,Rabbitmq會依據是否配置死信佇列的配置來決定訊息的去留! 如果開啟了配置死信佇列資訊,則訊息會被轉移到這個 死信佇列(DLX

)中,如果沒有配置,則此訊息會被丟棄!

死信佇列配置

  • 可以為每一個需要使用死信業務的佇列配置一個死信交換機

  • 每個佇列都可以配置專屬自己的死信佇列,相關訊息的進入死信佇列需要經過死信交換機來程序歸納處理

  • 死信交換機也只是一個普通的交換機,只是它是用來專門處理死信的交換機

  • 建立佇列時可以給這個佇列附帶一個死信的交換機,在這個佇列裡因各自情況出現問題的作廢的訊息會被重新發到附帶的交換機,然後讓這個交換機重新路由這條訊息。

具體的圖示:

若要使用策略指定DLX,請將鍵“死信交換”新增到策略定義中。例如:

rabbitmqctl    
rabbitmqctl set_policy DLX 
".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues Rabbitmqctl(Windows) rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues

上面的策略將DLX佇列“my-dlx”應用於所有佇列。上面只是一個例子,實際上不同的佇列可能會使用不同的死字設定(或者根本不使用)。

其他配置死信隊裡的方式有:

x-dead-letter-exchange:出現死信(dead letter)之後將死信(dead letter)重新發送到指定exchange
x
-dead-letter-routing-key:出現死信(dead letter)之後將死信(dead letter)重新按照指定的routing-key傳送
PS:當指定了死信交換機後時,除了通常對宣告佇列的配置許可權外,使用者還需要對該佇列具有讀取許可權,並對死信交換機具有寫許可權。許可權在佇列宣告時進行驗證。

完整的一個簡單的示例:

下面的示例主要是演示裡: 1:設定訊息的過期的時間為2s,2s之後就變為我們的死信

2:變為死信的訊息,會被轉移到我們的另一個死信交換機的佇列上

# !/usr/bin/env python
import pika
import sys

# 建立使用者登入的憑證,使用rabbitmq使用者密碼登入
credentials = pika.PlainCredentials("guest","guest")
# 建立連線
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))
# 通過連線建立通道
channel = connection.channel()

# ========
#   建立異常交換器和佇列,用於存放沒有正常處理的訊息。
channel.exchange_declare(exchange='xz-dead-letter-exchange',exchange_type='fanout',durable=True)
channel.queue_declare(queue='xz-dead-letter-queue',durable=True)
# 繫結佇列到指定的交換機
channel.queue_bind(queue='xz-dead-letter-queue',exchange= 'xz-dead-letter-exchange',routing_key= 'xz-dead-letter-queue')

# =========

# 通過通道建立我們的佇列 其中名稱是task_queue,並且這個佇列的訊息是需要持久化的!PS:持久化儲存存到磁碟會佔空間,
# 佇列不能由持久化變為普通佇列,反過來也是!否則會報錯!所以佇列型別建立的開始必須確定的!
arguments = {}
# TTL: ttl的單位是us,ttl=60000 表示 60s
# arguments['x-message-ttl'] = 2000
# 指定死信轉移到另一個交換機上具體的交換機的名稱
arguments['x-dead-letter-exchange'] = 'xz-dead-letter-exchange'
#  auto_delete=False,  # 最後一個佇列解綁則刪除  durable
# durable 和 x-message-ttl 不能同時的存在
channel.queue_declare(queue='task_queue', durable=True,arguments=arguments,auto_delete=False)
# 定義需要發的訊息內容
# 開始釋出訊息到我們的代理伺服器上,注意這裡沒有對發生訊息進行確認發生成功!!!
import time
for i in range(1,100):
    time.sleep(1)
    properties = pika.BasicProperties(delivery_mode=2,)
    # expiration 欄位以微秒為單位表示 TTL 值,6 秒的 message
    properties.expiration='2000'
    body = '小鐘同學你好!{}'.format(i).encode('utf-8')
    print(body.decode('utf-8'))
    channel.basic_publish(
        # 預設使用的/的交換機
        exchange='',
        # 預設的匹配的key
        routing_key='task_queue',
        # 傳送的訊息的內容
        body=body,
        # 發現的訊息的型別
        properties=properties# pika.BasicProperties中的delivery_mode=2指明message為持久的,1 的話 表示不是持久化 2:表示持久化
    )

connection.close()

執行上面的生產者的程式碼後觀察我們的輸出: 中國發出了8個訊息

小鐘同學你好!1
小鐘同學你好!2
小鐘同學你好!3
小鐘同學你好!4
小鐘同學你好!5
小鐘同學你好!6
小鐘同學你好!7
小鐘同學你好!8

結果這個8個訊息都沒有人去消費的時候:最後都轉移到了死信的佇列裡面:

關於死信佇列需要注意的點(來自官網的說明):

訊息在釋出到死信佇列後DLX目標佇列後會立即從原始佇列中刪除。這確保沒有可能出現過多的訊息積累,從而耗盡代理資源,但這確實意味著,如果目標佇列無法接受訊息,訊息可能會丟失

死信佇列裡面的死信的消費

當我們的死信消費者去消費死信訊息時候,需要注意點有:

我們的“死信”訊息訊息的properties裡面的header欄位資訊中增加一個叫做“x-death"的陣列內容,包含了以下欄位內容:

<BasicProperties(['delivery_mode=2', "headers={'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}"])>

其中我們的'x-death'內容為::

{'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}

具體每個欄位的意思是:

  • queue :進入死信佇列之前來自於哪個的訊息佇列名稱
  • reason:這個訊息變為死信的原因?expired 表示是因為過期!變為死信!
  • count:這個訊息在這個佇列中被死了多少次
  • time:該訊息釋出時間
  • exchange :訊息已釋出到哪些交換機上,PS:如果這個訊息是多次變為死信的話,這個地方最後就是死信的交換機
  • routing-keys 訊息發不來來源的路由keys
  • original-expiration:原訊息的過期時間屬性,PS:(如果訊息是死信的話)每條訊息ttl):。這個過期屬性將從死信中刪除,以防止它在被路由到的任何佇列中再次過期。
  • x-first-death-exchange:第一次變成死死信的時候來源的交換機
  • x-first-death-queue:第一次變成死信的時候來源佇列
  • x-first-death-reason:第一次變成死信的原因:expired 表示是因為過期!
其他變為死信的原因的說明:
rejected: 訊息被消費者拒收且回放到訊息獨立
expired: 訊息的設定來TTL時間到期
maxlen: 超過了佇列執行的最大的值

延遲佇列

RabbitMQ本身沒有直接支援延遲佇列功能,但是通過對死信佇列和過期時間的使用,其實我們可以綜合起上面的兩個特性來實現一個所謂的延遲佇列,延遲佇列的意思就是:

某個訊息再某個固定的時間後失效後,則進入到死信佇列裡面,其他死信的消費者實時的處理這些過期的訊息,這個就可以起到一個延遲處理的效果!

延遲佇列加上惰性佇列這種組合吧!其實也是可以考慮的!,即可以減小記憶體佔用,又可以實現訊息的延遲處理