1. 程式人生 > >python3 RabbitMQ ( RPC!)

python3 RabbitMQ ( RPC!)

What This Tutorial Focuses On

在第二個教程中,我們學習瞭如何使用工作佇列在多個工作者之間分配耗時的任務。

但是,如果我們需要在遠端計算機上執行一個函式並等待結果呢?那是另一回事了。這種模式通常稱為遠端過程呼叫或RPC。

在本教程中,我們將使用RabbitMQ構建RPC系統:客戶機和可伸縮RPC伺服器。由於我們沒有任何值得分發的耗時任務,我們將建立一個返回斐波那契數的虛擬RPC服務。

Client interface

為了說明如何使用RPC服務,我們將建立一個簡單的客戶端類。它將公開一個名為call的方法,該方法傳送RPC請求並阻塞,直到接收到答案:

fibonacci_rpc =
FibonacciRpcClient() result = fibonacci_rpc.call(4) print("fib(4) is %r" % result)

A note on RPC 雖然RPC在計算中是一種非常常見的模式,但它經常受到批評。當程式設計師不知道函式呼叫是本地呼叫還是慢RPC時,問題就出現了。這樣的混亂會導致不可預測的系統,並增加不必要的除錯複雜性。濫用RPC可能導致無法維護的義大利麵條式程式碼,而不是簡化軟體。

記住這一點,考慮以下建議: 確保哪個函式呼叫是本地的,哪個函式呼叫是遠端的。 檔案系統。明確元件之間的依賴關係。 處理錯誤情況。當RPC伺服器長時間宕機時,客戶機應該如何反應?

在有疑問時避免RPC。如果可以,您應該使用非同步管道——而不是像rpc那樣阻塞,結果將非同步推到下一個計算階段。

Callback queue

一般來說,在RabbitMQ上執行RPC很容易。客戶端傳送請求訊息,伺服器使用響應訊息進行應答。為了接收響應,客戶端需要與請求一起傳送一個“回撥”佇列地址。讓我們試一試:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key=
'rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request) # ... and some code to read a response message from the callback_queue ...

AMQP 0-9-1協議預先定義了一組與訊息相關的14個屬性。除下列情況外,大部分物業很少使用:

1.delivery_mode:將訊息標記為持久化(值為2)或瞬態(任何其他值)。您可能還記得第二個教程中的這個屬性。 2.content_type:用於描述編碼的mime型別。例如,對於經常使用的JSON編碼,最好將此屬性設定為:application/ JSON。

3.reply_to:通常用於命名回撥佇列。 4.correlation_id:用於將RPC響應與請求關聯起來。

相互關聯id

在上述方法中,我們建議為每個RPC請求建立一個回撥佇列。這非常低效,但幸運的是,還有更好的方法——讓我們為每個客戶機建立一個回撥佇列。

這引發了一個新問題,在該佇列中接收到響應之後,不清楚響應屬於哪個請求。這時使用correlation_id屬性。我們會為每個請求設定一個唯一的值。稍後,當我們在回撥佇列中接收到訊息時,我們將檢視此屬性,並基於此,我們將能夠將響應與請求匹配。如果我們看到一個未知的correlation_id值,我們可能會安全地丟棄訊息——它不屬於我們的請求。

您可能會問,為什麼我們應該忽略回撥佇列中的未知訊息,而不是錯誤導致失敗?這是由於伺服器端可能存在競爭條件。儘管不太可能,但RPC伺服器可能會在向我們傳送答案之後,但在向請求傳送確認訊息之前死亡。如果發生這種情況,重新啟動的RPC伺服器將再次處理請求。這就是為什麼在客戶機上我們必須優雅地處理重複的響應,而RPC應該是等冪的。

摘要

在這裡插入圖片描述

我們的RPC將這樣工作:

當客戶端啟動時,它會建立一個匿名排他回撥佇列。 對於RPC請求,客戶端傳送具有兩個屬性的訊息:reply_to(設定為回撥佇列)和correlation_id(設定為每個請求的唯一值)。 請求被髮送到rpc_queue佇列。

RPC工作人員(又名:伺服器)正在等待該佇列上的請求。當出現請求時,它會執行任務並使用reply_to欄位中的佇列將結果傳送回客戶機。

客戶機等待回撥佇列上的資料。當訊息出現時,它檢查correlation_id屬性。如果與請求中的值匹配,則返回對應用程式的響應。

Putting it all together

The code for rpc_server.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

伺服器程式碼相當簡單: 像往常一樣,我們首先建立連線並宣告佇列。 (11)我們宣告斐波那契函式。它只假設有效的正整數輸入。(不要期望這個方法對大的數字有效,它可能是最慢的遞迴實現)。 我們為RPC伺服器的核心basic_consumption聲明瞭一個回撥。它在收到請求時執行。它完成工作並返回響應。 我們可能需要執行多個伺服器程序。為了在多個伺服器上平均分配負載,我們需要設定prefetch_count設定。

The code for rpc_client.py:

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

客戶端程式碼稍微複雜一些:

(7)我們建立一個連線,通道和宣告一個排他性的回覆“回撥”佇列。 (16)我們訂閱“回撥”佇列,以便接收RPC響應。 (18)在每個響應上執行的’on_response’回撥做的是一項非常簡單的工作,對於每個響應訊息,它檢查correlation_id是否是我們正在尋找的物件。如果是這樣,它將儲存self的響應。響應並打破消費迴圈。 接下來,我們定義我們的主呼叫方法——它執行實際的RPC請求。 (24)在這個方法中,首先我們生成一個唯一的correlation_id號並儲存它—'on_response’回撥函式將使用這個值來捕獲適當的響應。 接下來,我們釋出請求訊息,具有兩個屬性:reply_to和correlation_id。 在這一點上,我們可以坐下來等待適當的反應。 (33)最後我們將響應返回給使用者。