變數提升、函式提升
一、什麼是訊息佇列(MQ)
MQ全稱為Message Queue 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取佇列中的訊息。這樣釋出者和使用者都不用知道對方的存在。
''' 生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,
所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,
阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。 '''
我們先不管訊息(Message)這個詞,來看看佇列(Queue)。這一看,佇列大家應該都熟悉吧。
佇列是一種先進先出的資料結構。
訊息佇列可以簡單理解為:把要傳輸的資料放在佇列中。
二、為什麼要使用訊息佇列
訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用解耦,非同步訊息,流量削鋒等問題,實現高效能,高可用,可伸縮和最終一致性架構。目前使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。
接下來利用一個外賣系統的訊息推送給大家解釋下MQ的意義。
詳細請看:https://zhuanlan.zhihu.com/p/99783523
三、RabbitMQ
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。rabbitMQ是一款基於AMQP協議的訊息中介軟體
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
3.1、rabbitmq的安裝
3.2、RabbitMQ的工作模型之簡單模式
3.2.1、程式碼
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhostView Code')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") ### 消費者 import pika #連線rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() #建立一個名為‘hello’的佇列 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) #去像hello佇列取資料 channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
3.2.2、引數
應答引數(在消費者中操作)
''' 應答模式-當生產者監聽佇列,去產生任務放到佇列中,消費者去取的時候,如果中間出現bug或者業務邏輯錯誤的時候,
導致消費者無法接收任務,從而導致資料丟失,這個時候,如果修改完bug之後,佇列中已經沒有任務了 手動模式- 這個時候生成者把任務放到佇列中,當消費者想取的時候,佇列會分複製出一份給消費者,當消費者產生bug之後,對佇列中的資料也不會產生影響,
所以我們需要加以下的程式碼, 當修改完bug重啟之後,消費者會發送給對列一個確認的訊號,然後佇列會移除那個任務,從而使資料完整 ''' auto_ack=True #是應答模式 Fasle 為手動模式 #需要在消費者中業務結束部分增加 ch.basic_ack(delivery_tag=method.delivery_tag) ps:如果想要資料完整性就需要改為手動模式,如果需要速度改為應答模式
持久化引數(在生產者中操作)
''' 當生產者生產完資料的時候,rabbitmq崩了,這個時候消費者還沒取資料,當重啟rabbitmq的時候,佇列中已經沒有資料了 ,
因為之前的資料是存在記憶體中的, 當我們設定durable=True的時候,佇列中的資料會被持久化到硬碟中,重啟rabbitmq的時候,消費者可以取到資料 ''' #宣告queue channel.queue_declare(queue='hello2', durable=True) # 若宣告過,則換一個名字 channel.basic_publish(exchange='', routing_key='hello2', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) )
分發引數
有兩個消費者同時監聽一個的佇列。其中一個執行緒sleep2秒,另一個消費者執行緒sleep1秒,但是處理的訊息是一樣多。這種方式叫輪詢分發(round-robin)不管誰忙,都不會多給訊息,總是你一個我一個。想要做到公平分發(fair dispatch),必須關閉自動應答ack,改成手動應答。使用basicQos(perfetch=1)限制每次只發送不超過1條訊息到同一個消費者,消費者必須手動反饋告知佇列,才會傳送下一個。
channel.basic_qos(prefetch_count=1)
3.3、交換機模式
3.3.1、交換機之釋出訂閱(exchange_type='fanout')
釋出訂閱和簡單的訊息佇列區別在於,釋出訂閱會將訊息傳送給所有的訂閱者,而訊息佇列中的資料被消費一次便消失。所以,RabbitMQ實現釋出和訂閱時,會為每一個訂閱者建立一個佇列,而釋出者釋出訊息時,會將訊息放置在所有相關佇列中。
# 生產者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close() # 消費者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare("",exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) channel.start_consuming()View Code
3.3.2、交換機之關鍵字(exchange_type='direct')
這個在釋出訂閱的基礎上更健壯了一些, 可以根據設定的關鍵字去傳送訊息
# 生產者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs2', exchange_type='direct') message = "info: Hello Yuan!" channel.basic_publish(exchange='logs2', routing_key='info', body=message) print(" [x] Sent %r" % message) connection.close() # 消費者 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs2', exchange_type='direct') result = channel.queue_declare("",exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) channel.start_consuming()View Code
3.3.3、交換機之萬用字元(exchange_type='topic')
萬用字元交換機”與之前的路由模式相比,它將資訊的傳輸型別的key更加細化,以“key1.key2.keyN....”的模式來指定資訊傳輸的key的大型別和大型別下面的小型別,讓消費者可以更加精細的確認自己想要獲取的資訊型別。而在消費者一段,不用精確的指定具體到哪一個大型別下的小型別的key,而是可以使用類似正則表示式(但與正則表示式規則完全不同)的萬用字元在指定一定範圍或符合某一個字串匹配規則的key,來獲取想要的資訊。
“萬用字元交換機”(Topic Exchange)將路由鍵和某模式進行匹配。此時佇列需要繫結在一個模式上。符號“#”匹配一個或多個詞,符號“*”僅匹配一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”只會匹配到“audit.irs”。(這裡與我們一般的正則表示式的“*”和“#”剛好相反,這裡我們需要注意一下。)
下面是一個解釋萬用字元模式交換機工作的一個樣例
上面的交換機制類似於一個國際新聞訊息網站的機制
# 生產者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs3', exchange_type='topic') message = "info: Hello ERU!" channel.basic_publish(exchange='logs3', routing_key='europe.weather', body=message) print(" [x] Sent %r" % message) connection.close() # 消費者 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs3', exchange_type='topic') result = channel.queue_declare("",exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs3', queue=queue_name, routing_key="#.news") print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) channel.start_consuming()View Code
四、基於rabbitmq的RPC實現
4.1、關於RPC
https://www.cnblogs.com/pyedu/p/12196027.html
4.2、RPC的實現
如圖我們可以看出生產端client向消費端server請求處理資料,他會經歷如下幾次來完成互動。- 1.生產端 生成rpc_queue佇列,這個佇列負責幫消費者 接收資料並把訊息發給消費端。
- 2.生產端 生成另外一個隨機佇列,這個佇列是發給消費端,消費這個用這個佇列把處理好的資料傳送給生產端。
- 3.生產端 生成一組唯一字串UUID,傳送給消費者,消費者會把這串字元作為驗證在發給生產者。
- 4.當消費端處理完資料,發給生產端,時會把處理資料與UUID一起通過隨機生產的佇列發回給生產端。
- 5.生產端,會使用while迴圈 不斷檢測是否有資料,並以這種形式來實現阻塞等待資料,來監聽消費端。
- 6.生產端獲取資料呼叫回撥函式,回撥函式判斷本機的UUID與消費端發回UID是否匹配,由於消費端,可能有多個,且處理時間不等所以需要判斷,判斷成功賦值資料,while迴圈就會捕獲到,完成互動。
client
import pika import uuid import time # 斐波那契數列 前兩個數相加依次排列 class FibonacciRpcClient(object): def __init__(self): # 賦值變數,一個迴圈值 self.response = None # 連結遠端 self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() # 生成隨機queue result = self.channel.queue_declare("",exclusive=True) # 隨機取queue名字,發給消費端 self.callback_queue = result.method.queue # self.on_response 回撥函式:只要收到訊息就呼叫這個函式。 # 宣告收到訊息後就 收queue=self.callback_queue內的訊息 self.channel.basic_consume(queue=self.callback_queue, auto_ack=True, on_message_callback=self.on_response) # 收到訊息就呼叫 # ch 管道記憶體物件地址 # method 訊息發給哪個queue # body資料物件 def on_response(self, ch, method, props, body): # 判斷本機生成的ID 與 生產端發過來的ID是否相等 if self.corr_id == props.correlation_id: # 將body值 賦值給self.response self.response = body def call(self, n): # 隨機一次唯一的字串 self.corr_id = str(uuid.uuid4()) # routing_key='rpc_queue' 發一個訊息到rpc_queue內 self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( # 執行命令之後結果返回給self.callaback_queue這個佇列中 reply_to = self.callback_queue, # 生成UUID 傳送給消費端 correlation_id = self.corr_id, ), # 發的訊息,必須傳入字串,不能傳數字 body=str(n)) # 沒有資料就迴圈收 while self.response is None: # 非阻塞版的start_consuming() # 沒有訊息不阻塞 self.connection.process_data_events() print("no msg...") time.sleep(0.5) return int(self.response) # 例項化 fibonacci_rpc = FibonacciRpcClient() response = fibonacci_rpc.call(50) print(" [.] Got %r" % response)client
server
#_*_coding:utf-8_*_ import pika import time # 連結socket connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 生成rpc queue channel.queue_declare(queue='rpc_queue') # 斐波那契數列 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) # 收到訊息就呼叫 # ch 管道記憶體物件地址 # method 訊息發給哪個queue # props 返回給消費的返回引數 # body資料物件 def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) # 呼叫斐波那契函式 傳入結果 response = fib(n) ch.basic_publish(exchange='', # 生產端隨機生成的queue routing_key=props.reply_to, # 獲取UUID唯一 字串數值 properties=pika.BasicProperties(correlation_id = \ props.correlation_id), # 訊息返回給生產端 body=str(response)) # 確保任務完成 # ch.basic_ack(delivery_tag = method.delivery_tag) # rpc_queue收到訊息:呼叫on_request回撥函式 # queue='rpc_queue'從rpc內收 channel.basic_consume(queue="rpc_queue", auto_ack=True, on_message_callback=on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()server