RabbitMQ 官方NET教程(六)【RPC】
在第二個教程中,我們學習瞭如何使用Work Queues
在多個工作者之間分配耗時的任務。
但是如果我們需要在遠端計算機上執行功能並等待結果怎麼辦? 那是一個不同的模式。 此模式通常稱為遠端過程呼叫或RPC
。
在本教程中,我們將使用RabbitMQ
構建一個RPC
系統:一個客戶端和一個可擴充套件的RPC
伺服器。由於我們沒有任何值得分發的耗時任務,我們將建立一個返回斐波納契數字的虛擬RPC
服務。
客戶端
為了說明如何使用RPC服務,我們將建立一個簡單的客戶端類。 它將公開一個名為call
的方法,該方法傳送RPC請求並阻塞,直到收到應答:
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
關於RPC的注意點
雖然RPC是一個很常見的計算模式,但它經常被批評。 當程式設計師不知道函式呼叫是本地函式還是緩慢的RPC時,會出現問題。 這樣的混亂導致了一個不可預知的系統,並增加了除錯的不必要的複雜性。濫用RPC可能導致不可維護的義大利麵條程式碼,而不是簡化軟體。 銘記這一點,請考慮以下建議: 確保顯而易見哪個函式呼叫是本地的,哪個是遠端的。 記錄您的系統。 使元件之間的依賴關係清晰。 處理錯誤情況。 當RPC伺服器長時間停機時,客戶端應該如何反應? 當有疑問避免RPC。 如果可以的話,你應該使用非同步管道 - 而不是類似RPC的阻塞,結果被非同步推送到下一個計算階段。
回撥佇列
一般來說RPC
在RabbitMQ
上很容易。客戶端傳送請求訊息,伺服器回覆響應訊息。 為了收到一個響應,我們需要傳送一個附帶callback
佇列地址的請求:
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "" ,
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
// ... then code to read a response message from the callback_queue ...
Message屬性
AMQP 0-9-1協議預先定義了一組隨附訊息的14個屬性。 大多數屬性很少使用,除了以下內容:
deliveryMode:將訊息標記為永續性(值為2)或transient(任何其他值)。 您可能會從第二個教程中記住此屬性。
contentType:用於描述mime型別的編碼。 例如對於經常使用的JSON編碼,將此屬性設定為:application / json是一個很好的做法。
replyTo:通常用來命名一個回撥佇列。
correlationId:用於將RPC響應與請求相關聯。
Correlation Id
在上面提出的方法中,我們建議為每個RPC請求建立一個回撥佇列。這是非常低效的,但幸運的是有一個更好的方法 - 讓我們為每個客戶端建立一個回撥佇列。
這引發了一個新的問題,在該佇列中收到響應,響應所屬的請求不清楚。那就是使用correlationId
屬性。我們將為每個請求設定唯一的值。之後,當我們在回撥佇列中收到一條訊息時,我們將檢視此屬性,並且基於此,我們將能夠將響應與請求相匹配。如果我們看到一個未知的correlationId
值,我們可能會安全地丟棄該訊息 - 它不屬於我們的請求。
您可能會問,為什麼我們應該忽略回撥佇列中的未知訊息,而不是出現錯誤?這是由於在伺服器端發生競爭條件的可能性。儘管不太可能,RPC伺服器可能會在傳送給我們的答案之後,但在傳送請求的確認訊息之前死亡。如果發生這種情況,重新啟動的RPC伺服器將再次處理該請求。這就是為什麼在客戶端上,我們必須優雅地處理這些重複的響應,而且RPC
理應上是冪等的。
總結
我們的RPC將像這樣工作:
當客戶端啟動時,它建立一個匿名獨佔回撥佇列。
對於RPC請求,客戶端傳送一個具有兩個屬性的訊息:replyTo,它被設定為回撥佇列和correlationId,correlationId被設定為每個請求的唯一值。
請求被髮送到rpc_queue佇列。
RPC worker(aka:server)正在等待佇列上的請求。 當請求出現時,它將執行該作業,並使用replyTo欄位中的佇列將結果傳送回客戶端。
客戶端等待回撥佇列中的資料。 當資訊出現時,它會檢查correlationId屬性。 如果它與請求中的值相匹配,則返回對應用程式的響應。
完整示例
斐波納契任務:
private static int fib(int n)
{
if (n == 0 || n == 1) return n;
return fib(n - 1) + fib(n - 2);
}
我們宣告我們的fibonacci函式。 它只假定有效的正整數輸入。 (不要指望這個工作者可以為大數字量工作,這可能是最慢的遞迴實現可能)。
我們的RPC伺服器RPCServer.cs
的程式碼如下所示:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class RPCServer
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "rpc_queue",
noAck: false, consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = null;
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
///
/// Assumes only valid positive integer input.
/// Don't expect this one to work for big numbers, and it's
/// probably the slowest recursive implementation possible.
///
private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return fib(n - 1) + fib(n - 2);
}
}
伺服器程式碼相當簡單:
像往常一樣,我們開始建立連線,通道和宣告佇列。
我們可能想要執行多個伺服器程序。 為了在多個伺服器上平均分配負載,我們需要在channel.basicQos中設定prefetchCount設定。
我們使用basicConsume訪問佇列。 然後我們註冊一個傳遞處理程式,我們在其中進行工作併發迴響應。
我們的RPC客戶機的程式碼RPCClient.cs
:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
class RPCClient
{
private IConnection connection;
private IModel channel;
private string replyQueueName;
private QueueingBasicConsumer consumer;
public RPCClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue: replyQueueName,
noAck: true,
consumer: consumer);
}
public string Call(string message)
{
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
while(true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
if(ea.BasicProperties.CorrelationId == corrId)
{
return Encoding.UTF8.GetString(ea.Body);
}
}
}
public void Close()
{
connection.Close();
}
}
class RPC
{
public static void Main()
{
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
}
}
客戶端程式碼涉及:
我們建立一個連線和通道,並宣告一個獨佔的'callback'佇列作為回覆。
我們訂閱'callback'佇列,以便我們可以接收RPC響應。
我們的call方法使得實際的RPC請求。
在這裡,我們首先生成一個唯一的correlationId數字並儲存它 - while迴圈將使用此值來捕獲適當的響應。
接下來,我們釋出請求訊息,其中包含兩個屬性:replyTo和correlationId。
在這一點上,我們可以坐下來等待適當的響應到達。
while迴圈正在做一個非常簡單的工作,對於每個響應訊息,它檢查correlationId是否是我們正在尋找的。 如果是這樣,它會儲存響應。
最後,我們將響應返回給使用者。
使客戶端請求:
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
我們的RPC服務現在已經準備就緒。 我們可以啟動伺服器:
cd RPCServer
dotnet run
# => [x] Awaiting RPC requests
執行客戶端請求fibonacci數字:
cd RPCClient
dotnet run
# => [x] Requesting fib(30)
這裡提出的設計不是RPC服務的唯一可能的實現,而是具有一些重要的優點:
如果RPC伺服器太慢,可以通過執行另一個RPC伺服器進行擴充套件。 嘗試在新的控制檯中執行第二個RPCServer。
在客戶端,RPC需要傳送和接收一條訊息。 不需要同步呼叫,如queueDeclare。 因此,RPC客戶端只需要一個網路往返單個RPC請求。
我們的程式碼仍然非常簡單,不會嘗試解決更復雜(但重要的)問題,如:
如果沒有伺服器執行,客戶端應該如何反應?
客戶端是否需要RPC的某種超時時間?
如果伺服器發生故障並引發異常,應該將其轉發給客戶端?
在處理之前防止無效的傳入訊息(例如檢查邊界,型別)。