1. 程式人生 > 程式設計 >Python實現RabbitMQ6種訊息模型的示例程式碼

Python實現RabbitMQ6種訊息模型的示例程式碼

RabbitMQ與Redis對比

​ RabbitMQ是一種比較流行的訊息中介軟體,之前我一直使用redis作為訊息中介軟體,但是生產環境比較推薦RabbitMQ來替代Redis,所以我去查詢了一些RabbitMQ的資料。相比於Redis,RabbitMQ優點很多,比如:

  • 具有訊息消費確認機制
  • 佇列,訊息,都可以選擇是否持久化,粒度更小、更靈活。
  • 可以實現負載均衡

RabbitMQ應用場景

  • 非同步處理:比如使用者註冊時的確認郵件、簡訊等交由rabbitMQ進行非同步處理
  • 應用解耦:比如收發訊息雙方可以使用訊息佇列,具有一定的緩衝功能
  • 流量削峰:一般應用於秒殺活動,可以控制使用者人數,也可以降低流量
  • 日誌處理:將info、warning、error等不同的記錄分開儲存

RabbitMQ訊息模型

​ 這裡使用 Pythonpika 這個庫來實現RabbitMQ中常見的6種訊息模型。沒有的可以先安裝:

pip install pika

1.單生產單消費模型:即完成基本的一對一訊息轉發。

Python實現RabbitMQ6種訊息模型的示例程式碼

# 生產者程式碼
import pika


credentials = pika.PlainCredentials('chuan','123') # mq使用者名稱和密碼,沒有則需要自己建立
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',credentials=credentials))

# 建立rabbit協議的通道
channel = connection.channel()
# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立。durable指定佇列是否持久化
channel.queue_declare(queue='python-test',durable=False)

# message不能直接傳送給queue,需經exchange到達queue,此處使用以空字串標識的預設的exchange
# 向佇列插入數值 routing_key是佇列名
channel.basic_publish(exchange='',routing_key='python-test',body='Hello world!2')
# 關閉與rabbitmq server的連線
connection.close()
# 消費者程式碼
import pika

credentials = pika.PlainCredentials('chuan','123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))
channel = connection.channel()
# 申明訊息佇列。當不確定生產者和消費者哪個先啟動時,可以兩邊重複宣告訊息佇列。
channel.queue_declare(queue='python-test',durable=False)
# 定義一個回撥函式來處理訊息佇列中的訊息,這裡是打印出來
def callback(ch,method,properties,body):
  # 手動傳送確認訊息
  ch.basic_ack(delivery_tag=method.delivery_tag)
  print(body.decode())
  # 告訴生產者,消費者已收到訊息

# 告訴rabbitmq,用callback來接收訊息
# 預設情況下是要對訊息進行確認的,以防止訊息丟失。
# 此處將auto_ack明確指明為True,不對訊息進行確認。
channel.basic_consume('python-test',on_message_callback=callback)
           # auto_ack=True) # 自動傳送確認訊息
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()

2.訊息分發模型:多個收聽者監聽一個佇列。

Python實現RabbitMQ6種訊息模型的示例程式碼

# 生產者程式碼
import pika


credentials = pika.PlainCredentials('chuan','123') # mq使用者名稱和密碼
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))

# 建立rabbit協議的通道
channel = connection.channel()
# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立。durable指定佇列是否持久化。確保沒有確認的訊息不會丟失
channel.queue_declare(queue='rabbitmqtest',durable=True)

# message不能直接傳送給queue,需經exchange到達queue,此處使用以空字串標識的預設的exchange
# 向佇列插入數值 routing_key是佇列名
# basic_publish的properties引數指定message的屬性。此處delivery_mode=2指明message為持久的
for i in range(10):
  channel.basic_publish(exchange='',body='Hello world!%s' % i,properties=pika.BasicProperties(delivery_mode=2))
# 關閉與rabbitmq server的連線
connection.close()
# 消費者程式碼,consume1與consume2
import pika
import time

credentials = pika.PlainCredentials('chuan',credentials=credentials))
channel = connection.channel()
# 申明訊息佇列。當不確定生產者和消費者哪個先啟動時,可以兩邊重複宣告訊息佇列。
channel.queue_declare(queue='rabbitmqtest',durable=True)
# 定義一個回撥函式來處理訊息佇列中的訊息,這裡是打印出來
def callback(ch,body):
  # 手動傳送確認訊息
  time.sleep(10)
  print(body.decode())
  # 告訴生產者,消費者已收到訊息
  ch.basic_ack(delivery_tag=method.delivery_tag)

# 如果該消費者的channel上未確認的訊息數達到了prefetch_count數,則不向該消費者傳送訊息
channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接收訊息
# 預設情況下是要對訊息進行確認的,以防止訊息丟失。
# 此處將no_ack明確指明為True,不對訊息進行確認。
channel.basic_consume('python-test',on_message_callback=callback)
           # auto_ack=True) # 自動傳送確認訊息
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()

3.fanout訊息訂閱模式:生產者將訊息傳送到Exchange,Exchange再轉發到與之繫結的Queue中,每個消費者再到自己的Queue中取訊息。

Python實現RabbitMQ6種訊息模型的示例程式碼

# 生產者程式碼
import pika


credentials = pika.PlainCredentials('chuan',credentials=credentials))
# 建立rabbit協議的通道
channel = connection.channel()
# fanout: 所有繫結到此exchange的queue都可以接收訊息(實時廣播)
# direct: 通過routingKey和exchange決定的那一組的queue可以接收訊息(有選擇接受)
# topic: 所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息(更細緻的過濾)
channel.exchange_declare('logs',exchange_type='fanout')


#因為是fanout廣播型別的exchange,這裡無需指定routing_key
for i in range(10):
  channel.basic_publish(exchange='logs',routing_key='',body='Hello world!%s' % i)

# 關閉與rabbitmq server的連線
connection.close()
import pika

credentials = pika.PlainCredentials('chuan',credentials=credentials))
channel = connection.channel()

#作為好的習慣,在producer和consumer中分別宣告一次以保證所要使用的exchange存在
channel.exchange_declare(exchange='logs',exchange_type='fanout')

# 隨機生成一個新的空的queue,將exclusive置為True,這樣在consumer從RabbitMQ斷開後會刪除該queue
# 是排他的。
result = channel.queue_declare('',exclusive=True)

# 用於獲取臨時queue的name
queue_name = result.method.queue

# exchange與queue之間的關係成為binding
# binding告訴exchange將message傳送該哪些queue
channel.queue_bind(exchange='logs',queue=queue_name)

# 定義一個回撥函式來處理訊息佇列中的訊息,這裡是打印出來
def callback(ch,body):
  # 手動傳送確認訊息
  print(body.decode())
  # 告訴生產者,消費者已收到訊息
  #ch.basic_ack(delivery_tag=method.delivery_tag)

# 如果該消費者的channel上未確認的訊息數達到了prefetch_count數,則不向該消費者傳送訊息
channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接收訊息
# 預設情況下是要對訊息進行確認的,以防止訊息丟失。
# 此處將no_ack明確指明為True,不對訊息進行確認。
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True) # 自動傳送確認訊息
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()

4.direct路由模式:此時生產者傳送訊息時需要指定RoutingKey,即路由Key,Exchange接收到訊息時轉發到與RoutingKey相匹配的佇列中。

Python實現RabbitMQ6種訊息模型的示例程式碼

# 生產者程式碼,測試命令可以使用:python produce.py error 404error
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個名為direct_logs的direct型別的exchange
# direct型別的exchange
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

# 從命令列獲取basic_publish的配置引數
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 向名為direct_logs的exchage按照設定的routing_key傳送message
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)

print(" [x] Sent %r:%r" % (severity,message))
connection.close()
# 消費者程式碼,測試可以使用:python consume.py error
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個名為direct_logs型別為direct的exchange
# 同時在producer和consumer中宣告exchage或queue是個好習慣,以保證其存在
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare('',exclusive=True)
queue_name = result.method.queue

# 從命令列獲取引數:routing_key
severities = sys.argv[1:]
if not severities:
  print(sys.stderr,"Usage: %s [info] [warning] [error]" % (sys.argv[0],))
  sys.exit(1)

for severity in severities:
  # exchange和queue之間的binding可接受routing_key引數
  # fanout型別的exchange直接忽略該引數。direct型別的exchange精確匹配該關鍵字進行message路由
  # 一個消費者可以繫結多個routing_key
  # Exchange就是根據這個RoutingKey和當前Exchange所有繫結的BindingKey做匹配,
  # 如果滿足要求,就往BindingKey所繫結的Queue傳送訊息
  channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

def callback(ch,body):
  print(" [x] %r:%r" % (method.routing_key,body,))


channel.basic_consume(queue=queue_name,auto_ack=True)

channel.start_consuming()

5.topic匹配模式:更細緻的分組,允許在RoutingKey中使用匹配符。

  • *:匹配一個單詞
  • #:匹配0個或多個單詞

Python實現RabbitMQ6種訊息模型的示例程式碼

# 生產者程式碼,基本不變,只需將exchange_type改為topic(測試:python produce.py rabbitmq.red 
# red color is my favorite
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個名為direct_logs的direct型別的exchange
# direct型別的exchange
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

# 從命令列獲取basic_publish的配置引數
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 向名為direct_logs的exchange按照設定的routing_key傳送message
channel.basic_publish(exchange='topic_logs',message))
connection.close()
# 消費者程式碼,(測試:python consume.py *.red)
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個名為direct_logs型別為direct的exchange
# 同時在producer和consumer中宣告exchage或queue是個好習慣,以保證其存在
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

result = channel.queue_declare('',))
  sys.exit(1)

for severity in severities:
  # exchange和queue之間的binding可接受routing_key引數
  # fanout型別的exchange直接忽略該引數。direct型別的exchange精確匹配該關鍵字進行message路由
  # 一個消費者可以繫結多個routing_key
  # Exchange就是根據這個RoutingKey和當前Exchange所有繫結的BindingKey做匹配,
  # 如果滿足要求,就往BindingKey所繫結的Queue傳送訊息
  channel.queue_bind(exchange='topic_logs',auto_ack=True)

channel.start_consuming()

6.RPC遠端過程呼叫:客戶端與伺服器之間是完全解耦的,即兩端既是訊息的傳送者也是接受者。

Python實現RabbitMQ6種訊息模型的示例程式碼

# 生產者程式碼
import pika
import uuid


# 在一個類中封裝了connection建立、queue宣告、consumer配置、回撥函式等
class FibonacciRpcClient(object):
  def __init__(self):
    # 建立到RabbitMQ Server的connection
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    self.channel = self.connection.channel()

    # 宣告一個臨時的回撥佇列
    result = self.channel.queue_declare('',exclusive=True)
    self._queue = result.method.queue

    # 此處client既是producer又是consumer,因此要配置consume引數
    # 這裡的指明從client自己建立的臨時佇列中接收訊息
    # 並使用on_response函式處理訊息
    # 不對訊息進行確認
    self.channel.basic_consume(queue=self._queue,on_message_callback=self.on_response,auto_ack=True)
    self.response = None
    self.corr_id = None

  # 定義回撥函式
  # 比較類的corr_id屬性與props中corr_id屬性的值
  # 若相同則response屬性為接收到的message
  def on_response(self,ch,props,body):
    if self.corr_id == props.correlation_id:
      self.response = body

  def call(self,n):
    # 初始化response和corr_id屬性
    self.corr_id = str(uuid.uuid4())

    # 使用預設exchange向server中定義的rpc_queue傳送訊息
    # 在properties中指定replay_to屬性和correlation_id屬性用於告知遠端server
    # correlation_id屬性用於匹配request和response
    self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(
                    reply_to=self._queue,correlation_id=self.corr_id,),# message需為字串
                  body=str(n))

    while self.response is None:
      self.connection.process_data_events()

    return int(self.response)


# 生成類的例項
fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
# 呼叫例項的call方法
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
# 消費者程式碼,這裡以生成斐波那契數列為例
import pika

# 建立到達RabbitMQ Server的connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個名為rpc_queue的queue
channel.queue_declare(queue='rpc_queue')

# 計算指定數字的斐波那契數
def fib(n):
  if n == 0:
    return 0
  elif n == 1:
    return 1
  else:
    return fib(n - 1) + fib(n - 2)

# 回撥函式,從queue接收到message後呼叫該函式進行處理
def on_request(ch,body):
  # 由message獲取要計算斐波那契數的數字
  n = int(body)
  print(" [.] fib(%s)" % n)
  # 呼叫fib函式獲得計算結果
  response = fib(n)

  # exchage為空字串則將message傳送個到routing_key指定的queue
  # 這裡queue為回撥函式引數props中reply_ro指定的queue
  # 要傳送的message為計算所得的斐波那契數
  # properties中correlation_id指定為回撥函式引數props中co的rrelation_id
  # 最後對訊息進行確認
  ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id=props.correlation_id),body=str(response))
  ch.basic_ack(delivery_tag=method.delivery_tag)


# 只有consumer已經處理並確認了上一條message時queue才分派新的message給它
channel.basic_qos(prefetch_count=1)

# 設定consumeer引數,即從哪個queue獲取訊息使用哪個函式進行處理,是否對訊息進行確認
channel.basic_consume(queue='rpc_queue',on_message_callback=on_request)

print(" [x] Awaiting RPC requests")

# 開始接收並處理訊息
channel.start_consuming()

到此這篇關於Python實現RabbitMQ6種訊息模型的示例程式碼的文章就介紹到這了,更多相關Python RabbitMQ訊息模型 內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!