1. 程式人生 > 實用技巧 >訊息佇列之RabbitMq

訊息佇列之RabbitMq

一 什麼是訊息佇列(MQ)

MQ全稱為Message Queue 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取佇列中的訊息。這樣釋出者和使用者都不用知道對方的存在。

'''
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
'''
生產者消費者模型

我們先不管訊息(Message)這個詞,來看看佇列(Queue)。這一看,佇列大家應該都熟悉吧。

佇列是一種先進先出的資料結構。

訊息佇列可以簡單理解為:把要傳輸的資料放在佇列中。

二、為什麼要用訊息佇列?

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用解耦,非同步訊息,流量削鋒等問題,實現高效能,高可用,可伸縮和最終一致性架構。目前使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

接下來利用一個外賣系統的訊息推送給大家解釋下MQ的意義。

三RabbitMQ

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

rabbitMQ是一款基於AMQP協議的訊息中介軟體,它能夠在應用之間提供可靠的訊息傳輸。在易用性,擴充套件性,高可用性上表現優秀。使用訊息中介軟體利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。而且兩端可以使用不同的語言編寫,大大提供了靈活性。

中文文件

3.1 rabbitmq的安裝

3.2 rabbitMQ工作模型

3.2.1 簡單模式

### 生產者

import pika

# 1.連結rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2.宣告一個佇列(建立佇列)
channel.queue_declare(queue='hello')

# 3.向指定佇列插入資料
channel.basic_publish(exchange=''
, # 簡單模式 routing_key='hello', # 指定佇列 body='Hello World!') # 插入的資料 print(" [x] Sent 'Hello World!'") ### 消費者 import pika # 1.連結rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 2.建立佇列(程式不一定生產者先執行,如果生產者先執行並已經建立了佇列,這段程式碼相當於不執行) channel.queue_declare(queue='hello') # 3.確定回撥函式 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 4.確定監聽佇列引數 channel.basic_consume(queue='hello', auto_ack=True, # 預設應答 on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') # 開始監聽(正式監聽) channel.start_consuming()

3.2.2 引數

應答引數(消費者修改)

channel.basic_consume(queue='hello',
                      auto_ack=False,  # 預設應答改為手動應答
                      on_message_callback=callback)

def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)
   ch.basic_ack(delivery_tag=method.delivery_tag)

持久化引數(生產者修改)

channel.queue_declare(queue='hello2',durable=True)

channel.basic_publish(exchange='',  # 簡單模式
       routing_key='hello2',  # 指定佇列
       body='Hello World!',
       properties=pika.BasicProperties(
           delivery_mode=2,  # make message persistent
           )
       )

分發引數(消費者,需將應答引數修改為手動應答)

有兩個消費者同時監聽一個的佇列。其中一個執行緒sleep2秒,另一個消費者執行緒sleep1秒,但是處理的訊息是一樣多。這種方式叫輪詢分發(round-robin)不管誰忙,都不會多給訊息,總是你一個我一個。想要做到公平分發(fair dispatch),必須關閉自動應答ack,改成手動應答。使用basicQos(perfetch=1)限制每次只發送不超過1條訊息到同一個消費者,消費者必須手動反饋告知佇列,才會傳送下一個。

channel.basic_qos(prefetch_count=1)

3.2.3 交換機模式(exchange)

交換機之釋出訂閱

釋出訂閱和簡單的訊息佇列區別在於,釋出訂閱會將訊息傳送給所有的訂閱者,而訊息佇列中的資料被消費一次便消失。所以,RabbitMQ實現釋出和訂閱時,會為每一個訂閱者建立一個佇列,而釋出者釋出訊息時,會將訊息放置在所有相關佇列中。

### 生產者
import pika

# 1.連結rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2.宣告一個名為logs、型別為fanout的交換機
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')  # fanout:釋出訂閱模式

# 向名為logs的交換機插入資料(message)
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

print(" [x] Sent %r" % message)
connection.close()


### 消費者
import pika

# 1.連結rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2.宣告一個名為logs、型別為fanout的交換機
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# 3.建立佇列
result = channel.queue_declare("", exclusive=True)  # exclusive=True 系統隨機佇列名
queue_name = result.method.queue  # result.method.queue 可以獲得佇列名稱

# 4.將佇列繫結到交換機上
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(" [*] Waiting for logs. To exit press CTRL+C")

# 5.確定回撥函式
def callback(ch, method, properties, body):
    print(" [*] %r" % body)

# 6.確定監聽佇列引數
channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

# 開始監聽
channel.start_consuming()

交換機之關鍵字

### 生產者
import pika

# 1.連結rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2.宣告一個名為logs、型別為fanout的交換機
channel.exchange_declare(exchange='logs2',
                         exchange_type='direct')  # direct:關鍵字模式

# 向名為logs的交換機插入資料(message)
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='error',  # routing_key繫結關鍵字
                      body=message)

print(" [x] Sent %r" % message)
connection.close()


### 消費者
import pika

# 1.連結rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2.宣告一個名為logs、型別為fanout的交換機
channel.exchange_declare(exchange='logs2',
                         exchange_type='direct')

# 3.建立佇列
result = channel.queue_declare("", exclusive=True)  # exclusive=True 系統隨機佇列名
queue_name = result.method.queue  # result.method.queue 可以獲得佇列名稱

# 4.將佇列繫結到交換機上
channel.queue_bind(exchange='logs2',
                   queue=queue_name,
                   routing_key='error')  # routing_key繫結關鍵字

print(" [*] Waiting for logs. To exit press CTRL+C")

# 5.確定回撥函式
def callback(ch, method, properties, body):
    print(" [*] %r" % body)

# 6.確定監聽佇列引數
channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

# 開始監聽
channel.start_consuming()

交換機之萬用字元

萬用字元交換機”與之前的路由模式相比,它將資訊的傳輸型別的key更加細化,以“key1.key2.keyN....”的模式來指定資訊傳輸的key的大型別和大型別下面的小型別,讓消費者可以更加精細的確認自己想要獲取的資訊型別。而在消費者一段,不用精確的指定具體到哪一個大型別下的小型別的key,而是可以使用類似正則表示式(但與正則表示式規則完全不同)的萬用字元在指定一定範圍或符合某一個字串匹配規則的key,來獲取想要的資訊。

“萬用字元交換機”(Topic Exchange)將路由鍵和某模式進行匹配。此時佇列需要繫結在一個模式上。符號“#”匹配一個或多個詞,符號“*”僅匹配一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”只會匹配到“audit.irs”。(這裡與我們一般的正則表示式的“*”和“#”剛好相反,這裡我們需要注意一下。)
下面是一個解釋萬用字元模式交換機工作的一個樣例

上面的交換機制類似於一個國際新聞訊息網站的機制。

### 生產者
import pika

# 1.連結rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2.宣告一個名為logs、型別為fanout的交換機
channel.exchange_declare(exchange='logs3',
                         exchange_type='topic')  # topic:萬用字元模式

# 向名為logs的交換機插入資料(message)
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='usa.news',  # routing_key繫結關鍵字
                      body=message)

print(" [x] Sent %r" % message)
connection.close()


### 消費者
import pika

# 1.連結rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2.宣告一個名為logs、型別為fanout的交換機
channel.exchange_declare(exchange='logs3',
                         exchange_type='topic')

# 3.建立佇列
result = channel.queue_declare("", exclusive=True)  # exclusive=True 系統隨機佇列名
queue_name = result.method.queue  # result.method.queue 可以獲得佇列名稱

# 4.將佇列繫結到交換機上
channel.queue_bind(exchange='logs2',
                   queue=queue_name,
                   routing_key='#.news')  # routing_key繫結關鍵字,#匹配一個或多個單詞,*僅匹配一個單詞

print(" [*] Waiting for logs. To exit press CTRL+C")

# 5.確定回撥函式
def callback(ch, method, properties, body):
    print(" [*] %r" % body)

# 6.確定監聽佇列引數
channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

# 開始監聽
channel.start_consuming()