1. 程式人生 > 其它 >常見限流策略

常見限流策略

RabbitMQ與Redis對比

RabbitMQ是一種比較流行的訊息中介軟體,之前我一直使用redis作為訊息中介軟體,但是生產環境比較推薦RabbitMQ來替代Redis,所以我去查詢了一些RabbitMQ的資料。相比於Redis,RabbitMQ優點很多,比如:

  • 具有訊息消費確認機制

  • 佇列,訊息,都可以選擇是否持久化,粒度更小、更靈活。

  • 可以實現負載均衡

RabbitMQ應用場景

  • 非同步處理:比如使用者註冊時的確認郵件、簡訊等交由rabbitMQ進行非同步處理

  • 應用解耦:比如收發訊息雙方可以使用訊息佇列,具有一定的緩衝功能

  • 流量削峰:一般應用於秒殺活動,可以控制使用者人數,也可以降低流量

  • 日誌處理:將info、warning、error等不同的記錄分開儲存

RabbitMQ訊息模型

這裡使用 Pythonpika 這個庫來實現RabbitMQ中常見的6種訊息模型。沒有的可以先安裝:


pip install pika

1.單生產單消費模型:即完成基本的一對一訊息轉發。


# 生產者程式碼
import pika


credentials = pika.PlainCredentials('chuan', '123') # mq使用者名稱和密碼,沒有則需要自己建立
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))

# 建立rabbit協議的通道
channel = connection.channel()
# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立。durable指定佇列是否持久化
channel.queue_declare(queue='python-test', durable=False)

# message不能直接傳送給queue,需經exchange到達queue,此處使用以空字串標識的預設的exchange
# 向佇列插入數值 routing_key是佇列名
channel.basic_publish(exchange='',
routing_key='python-test',
body='Hello world!2')
# 關閉與rabbitmq server的連線
connection.close()

# 消費者程式碼
import pika

credentials = pika.PlainCredentials('chuan', '123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
channel = connection.channel()
# 申明訊息佇列。當不確定生產者和消費者哪個先啟動時,可以兩邊重複宣告訊息佇列。
channel.queue_declare(queue='python-test', durable=False)
# 定義一個回撥函式來處理訊息佇列中的訊息,這裡是打印出來
def callback(ch, method, properties, body):
# 手動傳送確認訊息
ch.basic_ack(delivery_tag=method.delivery_tag)
print(body.decode())
# 告訴生產者,消費者已收到訊息

# 告訴rabbitmq,用callback來接收訊息
# 預設情況下是要對訊息進行確認的,以防止訊息丟失。
# 此處將auto_ack明確指明為True,不對訊息進行確認。
channel.basic_consume('python-test',
on_message_callback=callback)
# auto_ack=True) # 自動傳送確認訊息
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()

2.訊息分發模型:多個收聽者監聽一個佇列。


# 生產者程式碼
import pika


credentials = pika.PlainCredentials('chuan', '123') # mq使用者名稱和密碼
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))

# 建立rabbit協議的通道
channel = connection.channel()
# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立。durable指定佇列是否持久化。確保沒有確認的訊息不會丟失
channel.queue_declare(queue='rabbitmqtest', durable=True)

# message不能直接傳送給queue,需經exchange到達queue,此處使用以空字串標識的預設的exchange
# 向佇列插入數值 routing_key是佇列名
# basic_publish的properties引數指定message的屬性。此處delivery_mode=2指明message為持久的
for i in range(10):
channel.basic_publish(exchange='',
routing_key='python-test',
body='Hello world!%s' % i,
properties=pika.BasicProperties(delivery_mode=2))
# 關閉與rabbitmq server的連線
connection.close()

# 消費者程式碼,consume1與consume2
import pika
import time

credentials = pika.PlainCredentials('chuan', '123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
channel = connection.channel()
# 申明訊息佇列。當不確定生產者和消費者哪個先啟動時,可以兩邊重複宣告訊息佇列。
channel.queue_declare(queue='rabbitmqtest', durable=True)
# 定義一個回撥函式來處理訊息佇列中的訊息,這裡是打印出來
def callback(ch, method, properties, body):
# 手動傳送確認訊息
time.sleep(10)
print(body.decode())
# 告訴生產者,消費者已收到訊息
ch.basic_ack(delivery_tag=method.delivery_tag)

# 如果該消費者的channel上未確認的訊息數達到了prefetch_count數,則不向該消費者傳送訊息
channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接收訊息
# 預設情況下是要對訊息進行確認的,以防止訊息丟失。
# 此處將no_ack明確指明為True,不對訊息進行確認。
channel.basic_consume('python-test',
on_message_callback=callback)
# auto_ack=True) # 自動傳送確認訊息
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()

3.fanout訊息訂閱模式:生產者將訊息傳送到Exchange,Exchange再轉發到與之繫結的Queue中,每個消費者再到自己的Queue中取訊息。


# 生產者程式碼
import pika


credentials = pika.PlainCredentials('chuan', '123') # mq使用者名稱和密碼
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
# 建立rabbit協議的通道
channel = connection.channel()
# fanout: 所有繫結到此exchange的queue都可以接收訊息(實時廣播)
# direct: 通過routingKey和exchange決定的那一組的queue可以接收訊息(有選擇接受)
# topic: 所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息(更細緻的過濾)
channel.exchange_declare('logs', exchange_type='fanout')


#因為是fanout廣播型別的exchange,這裡無需指定routing_key
for i in range(10):
channel.basic_publish(exchange='logs',
routing_key='',
body='Hello world!%s' % i)

# 關閉與rabbitmq server的連線
connection.close()

import pika

credentials = pika.PlainCredentials('chuan', '123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
channel = connection.channel()

#作為好的習慣,在producer和consumer中分別宣告一次以保證所要使用的exchange存在
channel.exchange_declare(exchange='logs',
exchange_type='fanout')

# 隨機生成一個新的空的queue,將exclusive置為True,這樣在consumer從RabbitMQ斷開後會刪除該queue
# 是排他的。
result = channel.queue_declare('', exclusive=True)

# 用於獲取臨時queue的name
queue_name = result.method.queue

# exchange與queue之間的關係成為binding
# binding告訴exchange將message傳送該哪些queue
channel.queue_bind(exchange='logs',
queue=queue_name)

# 定義一個回撥函式來處理訊息佇列中的訊息,這裡是打印出來
def callback(ch, method, properties, body):
# 手動傳送確認訊息
print(body.decode())
# 告訴生產者,消費者已收到訊息
#ch.basic_ack(delivery_tag=method.delivery_tag)

# 如果該消費者的channel上未確認的訊息數達到了prefetch_count數,則不向該消費者傳送訊息
channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接收訊息
# 預設情況下是要對訊息進行確認的,以防止訊息丟失。
# 此處將no_ack明確指明為True,不對訊息進行確認。
channel.basic_consume(queue=queue_name,
on_message_callback=callback,
auto_ack=True) # 自動傳送確認訊息
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()

4.direct路由模式:此時生產者傳送訊息時需要指定RoutingKey,即路由Key,Exchange接收到訊息時轉發到與RoutingKey相匹配的佇列中。


# 生產者程式碼,測試命令可以使用:python produce.py error 404error
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個名為direct_logs的direct型別的exchange
# direct型別的exchange
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')

# 從命令列獲取basic_publish的配置引數
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 向名為direct_logs的exchage按照設定的routing_key傳送message
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)

print(" [x] Sent %r:%r" % (severity, message))
connection.close()
    # 消費者程式碼,測試可以使用:python consume.py error
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # 宣告一個名為direct_logs型別為direct的exchange
    # 同時在producer和consumer中宣告exchage或queue是個好習慣,以保證其存在
    channel.exchange_declare(exchange='direct_logs',
                 exchange_type='direct')
    
    result = channel.queue_declare('', exclusive=True)
    queue_name = result.method.queue
    
    # 從命令列獲取引數:routing_key
    severities = sys.argv[1:]
    if not severities:
      print(sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],))
      sys.exit(1)
    
    for severity in severities:
      # exchange和queue之間的binding可接受routing_key引數
      # fanout型別的exchange直接忽略該引數。direct型別的exchange精確匹配該關鍵字進行message路由
      # 一個消費者可以繫結多個routing_key
      # Exchange就是根據這個RoutingKey和當前Exchange所有繫結的BindingKey做匹配,
      # 如果滿足要求,就往BindingKey所繫結的Queue傳送訊息
      channel.queue_bind(exchange='direct_logs',
                queue=queue_name,
                routing_key=severity)
    
    def callback(ch, method, properties, body):
      print(" [x] %r:%r" % (method.routing_key, body,))
    
    
    channel.basic_consume(queue=queue_name,
               on_message_callback=callback,
               auto_ack=True)
    
    channel.start_consuming()

5.topic匹配模式:更細緻的分組,允許在RoutingKey中使用匹配符。

  • *:匹配一個單詞

  • #:匹配0個或多個單詞

    # 生產者程式碼,基本不變,只需將exchange_type改為topic(測試:python produce.py rabbitmq.red 
    # red color is my favorite
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # 宣告一個名為direct_logs的direct型別的exchange
    # direct型別的exchange
    channel.exchange_declare(exchange='topic_logs',
                 exchange_type='topic')
    
    # 從命令列獲取basic_publish的配置引數
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    
    # 向名為direct_logs的exchange按照設定的routing_key傳送message
    channel.basic_publish(exchange='topic_logs',
               routing_key=severity,
               body=message)
    
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()
    # 消費者程式碼,(測試:python consume.py *.red)
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # 宣告一個名為direct_logs型別為direct的exchange
    # 同時在producer和consumer中宣告exchage或queue是個好習慣,以保證其存在
    channel.exchange_declare(exchange='topic_logs',
                 exchange_type='topic')
    
    result = channel.queue_declare('', exclusive=True)
    queue_name = result.method.queue
    
    # 從命令列獲取引數:routing_key
    severities = sys.argv[1:]
    if not severities:
      print(sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],))
      sys.exit(1)
    
    for severity in severities:
      # exchange和queue之間的binding可接受routing_key引數
      # fanout型別的exchange直接忽略該引數。direct型別的exchange精確匹配該關鍵字進行message路由
      # 一個消費者可以繫結多個routing_key
      # Exchange就是根據這個RoutingKey和當前Exchange所有繫結的BindingKey做匹配,
      # 如果滿足要求,就往BindingKey所繫結的Queue傳送訊息
      channel.queue_bind(exchange='topic_logs',
                queue=queue_name,
                routing_key=severity)
    
    def callback(ch, method, properties, body):
      print(" [x] %r:%r" % (method.routing_key, body,))
    
    
    channel.basic_consume(queue=queue_name,
               on_message_callback=callback,
               auto_ack=True)
    
    channel.start_consuming()

6.RPC遠端過程呼叫:客戶端與伺服器之間是完全解耦的,即兩端既是訊息的傳送者也是接受者。

    # 生產者程式碼
    import pika
    import uuid
    
    
    # 在一個類中封裝了connection建立、queue宣告、consumer配置、回撥函式等
    class FibonacciRpcClient(object):
      def __init__(self):
        # 建立到RabbitMQ Server的connection
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
        self.channel = self.connection.channel()
    
        # 宣告一個臨時的回撥佇列
        result = self.channel.queue_declare('', exclusive=True)
        self._queue = result.method.queue
    
        # 此處client既是producer又是consumer,因此要配置consume引數
        # 這裡的指明從client自己建立的臨時佇列中接收訊息
        # 並使用on_response函式處理訊息
        # 不對訊息進行確認
        self.channel.basic_consume(queue=self._queue,
                      on_message_callback=self.on_response,
                      auto_ack=True)
        self.response = None
        self.corr_id = None
    
      # 定義回撥函式
      # 比較類的corr_id屬性與props中corr_id屬性的值
      # 若相同則response屬性為接收到的message
      def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
          self.response = body
    
      def call(self, n):
        # 初始化response和corr_id屬性
        self.corr_id = str(uuid.uuid4())
    
        # 使用預設exchange向server中定義的rpc_queue傳送訊息
        # 在properties中指定replay_to屬性和correlation_id屬性用於告知遠端server
        # correlation_id屬性用於匹配request和response
        self.channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                        reply_to=self._queue,
                        correlation_id=self.corr_id,
                      ),
                      # message需為字串
                      body=str(n))
    
        while self.response is None:
          self.connection.process_data_events()
    
        return int(self.response)
    
    
    # 生成類的例項
    fibonacci_rpc = FibonacciRpcClient()
    
    print(" [x] Requesting fib(30)")
    # 呼叫例項的call方法
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)
    # 消費者程式碼,這裡以生成斐波那契數列為例
    import pika
    
    # 建立到達RabbitMQ Server的connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # 宣告一個名為rpc_queue的queue
    channel.queue_declare(queue='rpc_queue')
    
    # 計算指定數字的斐波那契數
    def fib(n):
      if n == 0:
        return 0
      elif n == 1:
        return 1
      else:
        return fib(n - 1) + fib(n - 2)
    
    # 回撥函式,從queue接收到message後呼叫該函式進行處理
    def on_request(ch, method, props, body):
      # 由message獲取要計算斐波那契數的數字
      n = int(body)
      print(" [.] fib(%s)" % n)
      # 呼叫fib函式獲得計算結果
      response = fib(n)
    
      # exchage為空字串則將message傳送個到routing_key指定的queue
      # 這裡queue為回撥函式引數props中reply_ro指定的queue
      # 要傳送的message為計算所得的斐波那契數
      # properties中correlation_id指定為回撥函式引數props中co的rrelation_id
      # 最後對訊息進行確認
      ch.basic_publish(exchange='',
               routing_key=props.reply_to,
               properties=pika.BasicProperties(correlation_id=props.correlation_id),
               body=str(response))
      ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    # 只有consumer已經處理並確認了上一條message時queue才分派新的message給它
    channel.basic_qos(prefetch_count=1)
    
    # 設定consumeer引數,即從哪個queue獲取訊息使用哪個函式進行處理,是否對訊息進行確認
    channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
    
    print(" [x] Awaiting RPC requests")
    
    # 開始接收並處理訊息
    channel.start_consuming()