常見限流策略
阿新 • • 發佈:2021-06-17
RabbitMQ是一種比較流行的訊息中介軟體,之前我一直使用redis作為訊息中介軟體,但是生產環境比較推薦RabbitMQ來替代Redis,所以我去查詢了一些RabbitMQ的資料。相比於Redis,RabbitMQ優點很多,比如:
-
具有訊息消費確認機制
-
佇列,訊息,都可以選擇是否持久化,粒度更小、更靈活。
-
可以實現負載均衡
-
非同步處理:比如使用者註冊時的確認郵件、簡訊等交由rabbitMQ進行非同步處理
-
應用解耦:比如收發訊息雙方可以使用訊息佇列,具有一定的緩衝功能
-
流量削峰:一般應用於秒殺活動,可以控制使用者人數,也可以降低流量
-
日誌處理:將info、warning、error等不同的記錄分開儲存
RabbitMQ訊息模型
這裡使用 Python 的 pika 這個庫來實現RabbitMQ中常見的6種訊息模型。沒有的可以先安裝:
pip install pika
1.單生產單消費模型:即完成基本的一對一訊息轉發。
# 生產者程式碼
import pika
credentials = pika.PlainCredentials('chuan', '123') # mq使用者名稱和密碼,沒有則需要自己建立
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
# 建立rabbit協議的通道
channel = connection.channel()
# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立。durable指定佇列是否持久化
channel.queue_declare(queue='python-test', durable=False)
# message不能直接傳送給queue,需經exchange到達queue,此處使用以空字串標識的預設的exchange
# 向佇列插入數值 routing_key是佇列名
channel.basic_publish(exchange='',
routing_key='python-test',
body='Hello world!2')
# 關閉與rabbitmq server的連線
connection.close()
# 消費者程式碼
import pika
credentials = pika.PlainCredentials('chuan', '123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
channel = connection.channel()
# 申明訊息佇列。當不確定生產者和消費者哪個先啟動時,可以兩邊重複宣告訊息佇列。
channel.queue_declare(queue='python-test', durable=False)
# 定義一個回撥函式來處理訊息佇列中的訊息,這裡是打印出來
def callback(ch, method, properties, body):
# 手動傳送確認訊息
ch.basic_ack(delivery_tag=method.delivery_tag)
print(body.decode())
# 告訴生產者,消費者已收到訊息
# 告訴rabbitmq,用callback來接收訊息
# 預設情況下是要對訊息進行確認的,以防止訊息丟失。
# 此處將auto_ack明確指明為True,不對訊息進行確認。
channel.basic_consume('python-test',
on_message_callback=callback)
# auto_ack=True) # 自動傳送確認訊息
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()
2.訊息分發模型:多個收聽者監聽一個佇列。
# 生產者程式碼
import pika
credentials = pika.PlainCredentials('chuan', '123') # mq使用者名稱和密碼
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
# 建立rabbit協議的通道
channel = connection.channel()
# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立。durable指定佇列是否持久化。確保沒有確認的訊息不會丟失
channel.queue_declare(queue='rabbitmqtest', durable=True)
# message不能直接傳送給queue,需經exchange到達queue,此處使用以空字串標識的預設的exchange
# 向佇列插入數值 routing_key是佇列名
# basic_publish的properties引數指定message的屬性。此處delivery_mode=2指明message為持久的
for i in range(10):
channel.basic_publish(exchange='',
routing_key='python-test',
body='Hello world!%s' % i,
properties=pika.BasicProperties(delivery_mode=2))
# 關閉與rabbitmq server的連線
connection.close()
# 消費者程式碼,consume1與consume2
import pika
import time
credentials = pika.PlainCredentials('chuan', '123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
channel = connection.channel()
# 申明訊息佇列。當不確定生產者和消費者哪個先啟動時,可以兩邊重複宣告訊息佇列。
channel.queue_declare(queue='rabbitmqtest', durable=True)
# 定義一個回撥函式來處理訊息佇列中的訊息,這裡是打印出來
def callback(ch, method, properties, body):
# 手動傳送確認訊息
time.sleep(10)
print(body.decode())
# 告訴生產者,消費者已收到訊息
ch.basic_ack(delivery_tag=method.delivery_tag)
# 如果該消費者的channel上未確認的訊息數達到了prefetch_count數,則不向該消費者傳送訊息
channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接收訊息
# 預設情況下是要對訊息進行確認的,以防止訊息丟失。
# 此處將no_ack明確指明為True,不對訊息進行確認。
channel.basic_consume('python-test',
on_message_callback=callback)
# auto_ack=True) # 自動傳送確認訊息
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()
3.fanout訊息訂閱模式:生產者將訊息傳送到Exchange,Exchange再轉發到與之繫結的Queue中,每個消費者再到自己的Queue中取訊息。
# 生產者程式碼
import pika
credentials = pika.PlainCredentials('chuan', '123') # mq使用者名稱和密碼
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
# 建立rabbit協議的通道
channel = connection.channel()
# fanout: 所有繫結到此exchange的queue都可以接收訊息(實時廣播)
# direct: 通過routingKey和exchange決定的那一組的queue可以接收訊息(有選擇接受)
# topic: 所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息(更細緻的過濾)
channel.exchange_declare('logs', exchange_type='fanout')
#因為是fanout廣播型別的exchange,這裡無需指定routing_key
for i in range(10):
channel.basic_publish(exchange='logs',
routing_key='',
body='Hello world!%s' % i)
# 關閉與rabbitmq server的連線
connection.close()
import pika
credentials = pika.PlainCredentials('chuan', '123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=credentials))
channel = connection.channel()
#作為好的習慣,在producer和consumer中分別宣告一次以保證所要使用的exchange存在
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 隨機生成一個新的空的queue,將exclusive置為True,這樣在consumer從RabbitMQ斷開後會刪除該queue
# 是排他的。
result = channel.queue_declare('', exclusive=True)
# 用於獲取臨時queue的name
queue_name = result.method.queue
# exchange與queue之間的關係成為binding
# binding告訴exchange將message傳送該哪些queue
channel.queue_bind(exchange='logs',
queue=queue_name)
# 定義一個回撥函式來處理訊息佇列中的訊息,這裡是打印出來
def callback(ch, method, properties, body):
# 手動傳送確認訊息
print(body.decode())
# 告訴生產者,消費者已收到訊息
#ch.basic_ack(delivery_tag=method.delivery_tag)
# 如果該消費者的channel上未確認的訊息數達到了prefetch_count數,則不向該消費者傳送訊息
channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接收訊息
# 預設情況下是要對訊息進行確認的,以防止訊息丟失。
# 此處將no_ack明確指明為True,不對訊息進行確認。
channel.basic_consume(queue=queue_name,
on_message_callback=callback,
auto_ack=True) # 自動傳送確認訊息
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()
4.direct路由模式:此時生產者傳送訊息時需要指定RoutingKey,即路由Key,Exchange接收到訊息時轉發到與RoutingKey相匹配的佇列中。
# 生產者程式碼,測試命令可以使用:python produce.py error 404error
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 宣告一個名為direct_logs的direct型別的exchange
# direct型別的exchange
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
# 從命令列獲取basic_publish的配置引數
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
# 向名為direct_logs的exchage按照設定的routing_key傳送message
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
# 消費者程式碼,測試可以使用:python consume.py error import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 宣告一個名為direct_logs型別為direct的exchange # 同時在producer和consumer中宣告exchage或queue是個好習慣,以保證其存在 channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue # 從命令列獲取引數:routing_key severities = sys.argv[1:] if not severities: print(sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)) sys.exit(1) for severity in severities: # exchange和queue之間的binding可接受routing_key引數 # fanout型別的exchange直接忽略該引數。direct型別的exchange精確匹配該關鍵字進行message路由 # 一個消費者可以繫結多個routing_key # Exchange就是根據這個RoutingKey和當前Exchange所有繫結的BindingKey做匹配, # 如果滿足要求,就往BindingKey所繫結的Queue傳送訊息 channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body,)) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
5.topic匹配模式:更細緻的分組,允許在RoutingKey中使用匹配符。
-
*:匹配一個單詞
-
#:匹配0個或多個單詞
# 生產者程式碼,基本不變,只需將exchange_type改為topic(測試:python produce.py rabbitmq.red # red color is my favorite import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 宣告一個名為direct_logs的direct型別的exchange # direct型別的exchange channel.exchange_declare(exchange='topic_logs', exchange_type='topic') # 從命令列獲取basic_publish的配置引數 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' # 向名為direct_logs的exchange按照設定的routing_key傳送message channel.basic_publish(exchange='topic_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
# 消費者程式碼,(測試:python consume.py *.red) import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 宣告一個名為direct_logs型別為direct的exchange # 同時在producer和consumer中宣告exchage或queue是個好習慣,以保證其存在 channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue # 從命令列獲取引數:routing_key severities = sys.argv[1:] if not severities: print(sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)) sys.exit(1) for severity in severities: # exchange和queue之間的binding可接受routing_key引數 # fanout型別的exchange直接忽略該引數。direct型別的exchange精確匹配該關鍵字進行message路由 # 一個消費者可以繫結多個routing_key # Exchange就是根據這個RoutingKey和當前Exchange所有繫結的BindingKey做匹配, # 如果滿足要求,就往BindingKey所繫結的Queue傳送訊息 channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body,)) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
6.RPC遠端過程呼叫:客戶端與伺服器之間是完全解耦的,即兩端既是訊息的傳送者也是接受者。
# 生產者程式碼 import pika import uuid # 在一個類中封裝了connection建立、queue宣告、consumer配置、回撥函式等 class FibonacciRpcClient(object): def __init__(self): # 建立到RabbitMQ Server的connection self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() # 宣告一個臨時的回撥佇列 result = self.channel.queue_declare('', exclusive=True) self._queue = result.method.queue # 此處client既是producer又是consumer,因此要配置consume引數 # 這裡的指明從client自己建立的臨時佇列中接收訊息 # 並使用on_response函式處理訊息 # 不對訊息進行確認 self.channel.basic_consume(queue=self._queue, on_message_callback=self.on_response, auto_ack=True) self.response = None self.corr_id = None # 定義回撥函式 # 比較類的corr_id屬性與props中corr_id屬性的值 # 若相同則response屬性為接收到的message def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): # 初始化response和corr_id屬性 self.corr_id = str(uuid.uuid4()) # 使用預設exchange向server中定義的rpc_queue傳送訊息 # 在properties中指定replay_to屬性和correlation_id屬性用於告知遠端server # correlation_id屬性用於匹配request和response self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self._queue, correlation_id=self.corr_id, ), # message需為字串 body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) # 生成類的例項 fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") # 呼叫例項的call方法 response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
# 消費者程式碼,這裡以生成斐波那契數列為例 import pika # 建立到達RabbitMQ Server的connection connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 宣告一個名為rpc_queue的queue channel.queue_declare(queue='rpc_queue') # 計算指定數字的斐波那契數 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) # 回撥函式,從queue接收到message後呼叫該函式進行處理 def on_request(ch, method, props, body): # 由message獲取要計算斐波那契數的數字 n = int(body) print(" [.] fib(%s)" % n) # 呼叫fib函式獲得計算結果 response = fib(n) # exchage為空字串則將message傳送個到routing_key指定的queue # 這裡queue為回撥函式引數props中reply_ro指定的queue # 要傳送的message為計算所得的斐波那契數 # properties中correlation_id指定為回撥函式引數props中co的rrelation_id # 最後對訊息進行確認 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) # 只有consumer已經處理並確認了上一條message時queue才分派新的message給它 channel.basic_qos(prefetch_count=1) # 設定consumeer引數,即從哪個queue獲取訊息使用哪個函式進行處理,是否對訊息進行確認 channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") # 開始接收並處理訊息 channel.start_consuming()