遠程過程調用(RPC)
在第二篇教程中我們介紹了如何使用工作隊列(work queue)在多個工作者(woker)中間分發耗時的任務。
可是如果我們需要將一個函數運行在遠程計算機上並且等待從那兒獲取結果時,該怎麽辦呢?這就是另外的故事了。這種模式通常被稱為遠程過程調用(Remote Procedure Call)或者RPC。
這篇教程中,我們會使用RabbitMQ來構建一個RPC系統:包含一個客戶端和一個RPC服務器。現在的情況是,我們沒有一個值得被分發的足夠耗時的任務,所以接下來,我們會創建一個模擬RPC服務來返回斐波那契數列。
客戶端接口
為了展示RPC服務如何使用,我們創建了一個簡單的客戶端類。它會暴露出一個名為“call”的方法用來發送一個RPC請求,並且在收到回應前保持阻塞。
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)
關於RPC的註意事項:
盡管RPC在計算領域是一個常用模式,但它也經常被詬病。當一個問題被拋出的時候,程序員往往意識不到這到底是由本地調用還是由較慢的RPC調用引起的。同樣的困惑還來自於系統的不可預測性和給調試工作帶來的不必要的復雜性。跟軟件精簡不同的是,濫用RPC會導致不可維護的面條代碼.
考慮到這一點,牢記以下建議:
確保能夠明確的搞清楚哪個函數是本地調用的,哪個函數是遠程調用的。給你的系統編寫文檔。保持各個組件間的依賴明確。處理錯誤案例。明了客戶端改如何處理RPC服務器的宕機和長時間無響應情況。
當對避免使用RPC有疑問的時候。如果可以的話,你應該盡量使用異步管道來代替RPC類的阻塞。結果被異步地推送到下一個計算場景。
回調隊列
一般來說通過RabbitMQ來實現RPC是很容易的。一個客戶端發送請求信息,服務器端將其應用到一個回復信息中。為了接收到回復信息,客戶端需要在發送請求的時候同時發送一個回調隊列(callback queue)的地址。我們試試看:
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange=‘‘,
routing_key=‘rpc_queue‘,
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
# ... and some code to read a response message from the callback_queue ...
消息屬性
AMQP協議給消息預定義了一系列的14個屬性。大多數屬性很少會用到,除了以下幾個:
- delivery_mode(投遞模式):將消息標記為持久的(值為2)或暫存的(除了2之外的其他任何值)。第二篇教程裏接觸過這個屬性,記得吧?
- content_type(內容類型):用來描述編碼的mime-type。例如在實際使用中常常使用application/json來描述JOSN編碼類型。
- reply_to(回復目標):通常用來命名回調隊列。
- correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。
關聯標識
上邊介紹的方法中,我們建議給每一個RPC請求新建一個回調隊列。這不是一個高效的做法,幸好這兒有一個更好的辦法 —— 我們可以為每個客戶端只建立一個獨立的回調隊列。
這就帶來一個新問題,當此隊列接收到一個響應的時候它無法辨別出這個響應是屬於哪個請求的。correlation_id 就是為了解決這個問題而來的。我們給每個請求設置一個獨一無二的值。稍後,當我們從回調隊列中接收到一個消息的時候,我們就可以查看這條屬性從而將響應和請求匹配起來。如果我們接手到的消息的correlation_id是未知的,那就直接銷毀掉它,因為它不屬於我們的任何一條請求。
你也許會問,為什麽我們接收到未知消息的時候不拋出一個錯誤,而是要將它忽略掉?這是為了解決服務器端有可能發生的競爭情況。盡管可能性不大,但RPC服務器還是有可能在已將應答發送給我們但還未將確認消息發送給請求的情況下死掉。如果這種情況發生,RPC在重啟後會重新處理請求。這就是為什麽我們必須在客戶端優雅的處理重復響應,同時RPC也需要盡可能保持冪等性。
總結
我們的RPC如此工作:
- 當客戶端啟動的時候,它創建一個匿名獨享的回調隊列。
- 在RPC請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to屬性,另一個是設置唯一值的 correlation_id 屬性。
- 將請求發送到一個 rpc_queue 隊列中。
- RPC工作者(又名:服務器)等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工作並且將帶有執行結果的消息發送給reply_to字段指定的隊列。
- 客戶端等待回調隊列裏的數據。當有消息出現的時候,它會檢查correlation_id屬性。如果此屬性的值與請求匹配,將它返回給應用。
整合到一起
rpc_server.py代碼:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘rpc_queue‘)
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print " [.] fib(%s)" % (n,)
response = fib(n)
ch.basic_publish(exchange=‘‘,
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=‘rpc_queue‘)
print " [x] Awaiting RPC requests"
channel.start_consuming()
服務器端代碼相當簡單:
- (4)像往常一樣,我們建立連接,聲明隊列
- (11)我們聲明我們的fibonacci函數,它假設只有合法的正整數當作輸入。(別指望這個函數能處理很大的數值,函數遞歸你們都懂得...)
- (19)我們為 basic_consume 聲明了一個回調函數,這是RPC服務器端的核心。它執行實際的操作並且作出響應。
- (32)或許我們希望能在服務器上多開幾個線程。為了能將負載平均地分攤到多個服務器,我們需要將 prefetch_count 設置好。
rpc_client.py 代碼:
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange=‘‘,
routing_key=‘rpc_queue‘,
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)
客戶端代碼稍微有點難懂:
- (7)建立連接、通道並且為回復(replies)聲明獨享的回調隊列。
- (16)我們訂閱這個回調隊列,以便接收RPC的響應。
- (18)“on_response”回調函數對每一個響應執行一個非常簡單的操作,檢查每一個響應消息的correlation_id屬性是否與我們期待的一致,如果一致,將響應結果賦給self.response,然後跳出consuming循環。
- (23)接下來,我們定義我們的主要方法 call 方法。它執行真正的RPC請求。
- (24)在這個方法中,首先我們生成一個唯一的 correlation_id 值並且保存起來,‘on_response‘回調函數會用它來獲取符合要求的響應。
- (25)接下來,我們將帶有 reply_to 和 correlation_id 屬性的消息發布出去。
- (32)現在我們可以坐下來,等待正確的響應到來。
- (33)最後,我們將響應返回給用戶。
我們的RPC服務已經準備就緒了,現在啟動服務器端:
$ python rpc_server.py
[x] Awaiting RPC requests
運行客戶端,請求一個fibonacci隊列。
$ python rpc_client.py
[x] Requesting fib(30)
此處呈現的設計並不是實現RPC服務的唯一方式,但是他有一些重要的優勢:
- 如果RPC服務器運行的過慢的時候,你可以通過運行另外一個服務器端輕松擴展它。試試在控制臺中運行第二個 rpc_server.py 。
- 在客戶端,RPC請求只發送或接收一條消息。不需要像 queue_declare 這樣的異步調用。所以RPC客戶端的單個請求只需要一個網絡往返。
我們的代碼依舊非常簡單,而且沒有試圖去解決一些復雜(但是重要)的問題,如:
- 當沒有服務器運行時,客戶端如何作出反映。
- 客戶端是否需要實現類似RPC超時的東西。
- 如果服務器發生故障,並且拋出異常,應該被轉發到客戶端嗎?
- 在處理前,防止混入無效的信息(例如檢查邊界)
如果你想做一些實驗,你會發現rabbitmq-management plugin在觀測隊列方面是很有用處的。
(完整的rpc_client.py 和 rpc_server.py代碼)
遠程過程調用(RPC)