1. 程式人生 > 程式設計 >如何通過Python實現RabbitMQ延遲佇列

如何通過Python實現RabbitMQ延遲佇列

最近在做一任務時,遇到需要延遲處理的資料,最開始的做法是現將資料儲存在資料庫,然後寫個指令碼,隔五分鐘掃描資料表再處理資料,實際效果並不好。因為系統本身一直在用RabbitMQ做非同步處理任務的中介軟體,所以想到是否可以利用RabbitMQ實現延遲佇列。功夫不負有心人,RabbitMQ雖然沒有現成可用的延遲佇列,但是可以利用其兩個重要特性來實現之:1、Time To Live(TTL)訊息超時機制;2、Dead Letter Exchanges(DLX)死信佇列。下面將具體描述實現原理以及實現代

延遲佇列的基礎原理Time To Live(TTL)

RabbitMQ可以針對Queue設定x-expires 或者 針對Message設定 x-message-ttl,來控制訊息的生存時間,如果超時(兩者同時設定以最先到期的時間為準),則訊息變為dead letter(死信)

RabbitMQ訊息的過期時間有兩種方法設定。

通過佇列(Queue)的屬性設定,佇列中所有的訊息都有相同的過期時間。(本次延遲佇列採用的方案)對訊息單獨設定,每條訊息TTL可以不同。

如果同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為死信(dead letter)

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個引數,如果佇列內出現了dead letter,則按照這兩個引數重新路由轉發到指定的佇列。

  • x-dead-letter-exchange:出現死信(dead letter)之後將dead letter重新發送到指定exchange
  • x-dead-letter-routing-key:出現死信(dead letter)之後將dead letter重新按照指定的routing-key傳送

佇列中出現死信(dead letter)的情況有:

  • 訊息或者佇列的TTL過期。(延遲佇列利用的特性)
  • 佇列達到最大長度
  • 訊息被消費端拒絕(basic.reject or basic.nack)並且requeue=false

綜合上面兩個特性,將佇列設定TTL規則,佇列TTL過期後訊息會變成死信,然後利用DLX特性將其轉發到另外的交換機和佇列就可以被重新消費,達到延遲消費效果。

如何通過Python實現RabbitMQ延遲佇列

延遲佇列設計及實現(Python)

從上面描述,延遲佇列的實現大致分為兩步:

產生死信,有兩種方式Per-Message TTL和 Queue TTL,因為我的需求中是所有的訊息延遲處理時間相同,所以本實現中採用 Queue TTL設定佇列的TTL,如果需要將佇列中的訊息設定不同的延遲處理時間,則設定Per-Message TTL(官方文件)

設定死信的轉發規則,Dead Letter Exchanges設定方法(官方文件)

完整程式碼如下:

"""
Created on Fri Aug 3 17:00:44 2018

@author: Bge
"""
import pika,json,logging
class RabbitMQClient:
  def __init__(self,conn_str='amqp://user:pwd@host:port/%2F'):
    self.exchange_type = "direct"
    self.connection_string = conn_str
    self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
    self.channel = self.connection.channel()
    self._declare_retry_queue() #RetryQueue and RetryExchange
    logging.debug("connection established")
  def close_connection(self):
    self.connection.close()
    logging.debug("connection closed")
  def declare_exchange(self,exchange):
    self.channel.exchange_declare(exchange=exchange,exchange_type=self.exchange_type,durable=True)
  def declare_queue(self,queue):
    self.channel.queue_declare(queue=queue,durable=True,)
  def declare_delay_queue(self,queue,DLX='RetryExchange',TTL=60000):
    """
    建立延遲佇列
    :param TTL: ttl的單位是us,ttl=60000 表示 60s
    :param queue:
    :param DLX:死信轉發的exchange
    :return:
    """
    arguments={}
    if DLX:
      #設定死信轉發的exchange
      arguments[ 'x-dead-letter-exchange']=DLX
    if TTL:
      arguments['x-message-ttl']=TTL
    print(arguments)
    self.channel.queue_declare(queue=queue,arguments=arguments)
  def _declare_retry_queue(self):
    """
    建立異常交換器和佇列,用於存放沒有正常處理的訊息。
    :return:
    """
    self.channel.exchange_declare(exchange='RetryExchange',exchange_type='fanout',durable=True)
    self.channel.queue_declare(queue='RetryQueue',durable=True)
    self.channel.queue_bind('RetryQueue','RetryExchange','RetryQueue')
  def publish_message(self,routing_key,msg,exchange='',delay=0,TTL=None):
    """
    傳送訊息到指定的交換器
    :param exchange: RabbitMQ交換器
    :param msg: 訊息實體,是一個序列化的JSON字串
    :return:
    """
    if delay==0:
      self.declare_queue(routing_key)
    else:
      self.declare_delay_queue(routing_key,TTL=TTL)
    if exchange!='':
      self.declare_exchange(exchange)
    self.channel.basic_publish(exchange=exchange,routing_key=routing_key,body=msg,properties=pika.BasicProperties(
                    delivery_mode=2,type=exchange
                  ))
    self.close_connection()
    print("message send out to %s" % exchange)
    logging.debug("message send out to %s" % exchange)
  def start_consume(self,callback,queue='#',delay=1):
    """
    啟動消費者,開始消費RabbitMQ中的訊息
    :return:
    """
    if delay==1:
      queue='RetryQueue'
    else:
      self.declare_queue(queue)
    self.channel.basic_qos(prefetch_count=1)
    try:
      self.channel.basic_consume( # 消費訊息
        callback,# 如果收到訊息,就呼叫callback函式來處理訊息
        queue=queue,# 你要從那個佇列裡收訊息
      )
      self.channel.start_consuming()
    except KeyboardInterrupt:
      self.stop_consuming()
  def stop_consuming(self):
    self.channel.stop_consuming()
    self.close_connection()
  def message_handle_successfully(channel,method):
    """
    如果訊息處理正常完成,必須呼叫此方法,
    否則RabbitMQ會認為訊息處理不成功,重新將訊息放回待執行佇列中
    :param channel: 回撥函式的channel引數
    :param method: 回撥函式的method引數
    :return:
    """
    channel.basic_ack(delivery_tag=method.delivery_tag)
  def message_handle_failed(channel,method):
    """
    如果訊息處理失敗,應該呼叫此方法,會自動將訊息放入異常佇列
    :param channel: 回撥函式的channel引數
    :param method: 回撥函式的method引數
    :return:
    """
    channel.basic_reject(delivery_tag=method.delivery_tag,requeue=False)

釋出訊息程式碼如下:

from MQ.RabbitMQ import RabbitMQClient
print("start program")
client = RabbitMQClient()
msg1 = '{"key":"value"}'
client.publish_message('test-delay',msg1,delay=1,TTL=10000)
print("message send out")

消費者程式碼如下:

from MQ.RabbitMQ import RabbitMQClient
import json
print("start program")
client = RabbitMQClient()
def callback(ch,method,properties,body):
    msg = body.decode()
    print(msg)
    # 如果處理成功,則呼叫此訊息回覆ack,表示訊息成功處理完成。
    RabbitMQClient.message_handle_successfully(ch,method)
queue_name = "RetryQueue"
client.start_consume(callback,queue_name,delay=0)

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。