1. 程式人生 > >RabbitMQ 官方NET教程(六)【RPC】

RabbitMQ 官方NET教程(六)【RPC】

在第二個教程中,我們學習瞭如何使用Work Queues在多個工作者之間分配耗時的任務。

但是如果我們需要在遠端計算機上執行功能並等待結果怎麼辦? 那是一個不同的模式。 此模式通常稱為遠端過程呼叫或RPC

在本教程中,我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個可擴充套件的RPC伺服器。由於我們沒有任何值得分發的耗時任務,我們將建立一個返回斐波納契數字的虛擬RPC服務。

客戶端

為了說明如何使用RPC服務,我們將建立一個簡單的客戶端類。 它將公開一個名為call的方法,該方法傳送RPC請求並阻塞,直到收到應答:

var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();

關於RPC的注意點

 雖然RPC是一個很常見的計算模式,但它經常被批評。 當程式設計師不知道函式呼叫是本地函式還是緩慢的RPC時,會出現問題。 這樣的混亂導致了一個不可預知的系統,並增加了除錯的不必要的複雜性。濫用RPC可能導致不可維護的義大利麵條程式碼,而不是簡化軟體。

 銘記這一點,請考慮以下建議:

     確保顯而易見哪個函式呼叫是本地的,哪個是遠端的。
     記錄您的系統。 使元件之間的依賴關係清晰。
     處理錯誤情況。 當RPC伺服器長時間停機時,客戶端應該如何反應?

 當有疑問避免RPC。 如果可以的話,你應該使用非同步管道 - 而不是類似RPC的阻塞,結果被非同步推送到下一個計算階段。

回撥佇列

一般來說RPCRabbitMQ上很容易。客戶端傳送請求訊息,伺服器回覆響應訊息。 為了收到一個響應,我們需要傳送一個附帶callback佇列地址的請求:

var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: ""
, routingKey: "rpc_queue", basicProperties: props, body: messageBytes); // ... then code to read a response message from the callback_queue ...

Message屬性

 AMQP 0-9-1協議預先定義了一組隨附訊息的14個屬性。 大多數屬性很少使用,除了以下內容:

     deliveryMode:將訊息標記為永續性(值為2)或transient(任何其他值)。 您可能會從第二個教程中記住此屬性。
     contentType:用於描述mime型別的編碼。 例如對於經常使用的JSON編碼,將此屬性設定為:application / json是一個很好的做法。
     replyTo:通常用來命名一個回撥佇列。
     correlationId:用於將RPC響應與請求相關聯。

Correlation Id

在上面提出的方法中,我們建議為每個RPC請求建立一個回撥佇列。這是非常低效的,但幸運的是有一個更好的方法 - 讓我們為每個客戶端建立一個回撥佇列。

這引發了一個新的問題,在該佇列中收到響應,響應所屬的請求不清楚。那就是使用correlationId屬性。我們將為每個請求設定唯一的值。之後,當我們在回撥佇列中收到一條訊息時,我們將檢視此屬性,並且基於此,我們將能夠將響應與請求相匹配。如果我們看到一個未知的correlationId值,我們可能會安全地丟棄該訊息 - 它不屬於我們的請求。

您可能會問,為什麼我們應該忽略回撥佇列中的未知訊息,而不是出現錯誤?這是由於在伺服器端發生競爭條件的可能性。儘管不太可能,RPC伺服器可能會在傳送給我們的答案之後,但在傳送請求的確認訊息之前死亡。如果發生這種情況,重新啟動的RPC伺服器將再次處理該請求。這就是為什麼在客戶端上,我們必須優雅地處理這些重複的響應,而且RPC理應上是冪等的。

總結

這裡寫圖片描述
我們的RPC將像這樣工作:

 當客戶端啟動時,它建立一個匿名獨佔回撥佇列。
 對於RPC請求,客戶端傳送一個具有兩個屬性的訊息:replyTo,它被設定為回撥佇列和correlationId,correlationId被設定為每個請求的唯一值。
 請求被髮送到rpc_queue佇列。
 RPC worker(aka:server)正在等待佇列上的請求。 當請求出現時,它將執行該作業,並使用replyTo欄位中的佇列將結果傳送回客戶端。
 客戶端等待回撥佇列中的資料。 當資訊出現時,它會檢查correlationId屬性。 如果它與請求中的值相匹配,則返回對應用程式的響應。

完整示例

斐波納契任務:

private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

我們宣告我們的fibonacci函式。 它只假定有效的正整數輸入。 (不要指望這個工作者可以為大數字量工作,這可能是最慢的遞迴實現可能)。

我們的RPC伺服器RPCServer.cs的程式碼如下所示:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class RPCServer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "rpc_queue", durable: false,
              exclusive: false, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: "rpc_queue",
              noAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
            {
                string response = null;

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    var message = Encoding.UTF8.GetString(body);
                    int n = int.Parse(message);
                    Console.WriteLine(" [.] fib({0})", message);
                    response = fib(n).ToString();
                }
                catch (Exception e)
                {
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                }
                finally
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                      basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                      multiple: false);
                }
            };

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

    /// 

    /// Assumes only valid positive integer input.
    /// Don't expect this one to work for big numbers, and it's
    /// probably the slowest recursive implementation possible.
    /// 

    private static int fib(int n)
    {
        if (n == 0 || n == 1)
        {
            return n;
        }

        return fib(n - 1) + fib(n - 2);
    }
}

伺服器程式碼相當簡單:

 像往常一樣,我們開始建立連線,通道和宣告佇列。
 我們可能想要執行多個伺服器程序。 為了在多個伺服器上平均分配負載,我們需要在channel.basicQos中設定prefetchCount設定。
 我們使用basicConsume訪問佇列。 然後我們註冊一個傳遞處理程式,我們在其中進行工作併發迴響應。

我們的RPC客戶機的程式碼RPCClient.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

class RPCClient
{
    private IConnection connection;
    private IModel channel;
    private string replyQueueName;
    private QueueingBasicConsumer consumer;

    public RPCClient()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new QueueingBasicConsumer(channel);
        channel.BasicConsume(queue: replyQueueName,
                             noAck: true,
                             consumer: consumer);
    }

    public string Call(string message)
    {
        var corrId = Guid.NewGuid().ToString();
        var props = channel.CreateBasicProperties();
        props.ReplyTo = replyQueueName;
        props.CorrelationId = corrId;

        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: "",
                             routingKey: "rpc_queue",
                             basicProperties: props,
                             body: messageBytes);

        while(true)
        {
            var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
            if(ea.BasicProperties.CorrelationId == corrId)
            {
                return Encoding.UTF8.GetString(ea.Body);
            }
        }
    }

    public void Close()
    {
        connection.Close();
    }
}

class RPC
{
    public static void Main()
    {
        var rpcClient = new RPCClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");
        Console.WriteLine(" [.] Got '{0}'", response);

        rpcClient.Close();
    }
}

客戶端程式碼涉及:

 我們建立一個連線和通道,並宣告一個獨佔的'callback'佇列作為回覆。
 我們訂閱'callback'佇列,以便我們可以接收RPC響應。
 我們的call方法使得實際的RPC請求。
 在這裡,我們首先生成一個唯一的correlationId數字並儲存它 - while迴圈將使用此值來捕獲適當的響應。
 接下來,我們釋出請求訊息,其中包含兩個屬性:replyTo和correlationId。
 在這一點上,我們可以坐下來等待適當的響應到達。
 while迴圈正在做一個非常簡單的工作,對於每個響應訊息,它檢查correlationId是否是我們正在尋找的。 如果是這樣,它會儲存響應。
 最後,我們將響應返回給使用者。

使客戶端請求:

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

我們的RPC服務現在已經準備就緒。 我們可以啟動伺服器:

cd RPCServer
dotnet run
# => [x] Awaiting RPC requests

執行客戶端請求fibonacci數字:

cd RPCClient
dotnet run
# => [x] Requesting fib(30)

這裡提出的設計不是RPC服務的唯一可能的實現,而是具有一些重要的優點:

 如果RPC伺服器太慢,可以通過執行另一個RPC伺服器進行擴充套件。 嘗試在新的控制檯中執行第二個RPCServer。
 在客戶端,RPC需要傳送和接收一條訊息。 不需要同步呼叫,如queueDeclare。 因此,RPC客戶端只需要一個網路往返單個RPC請求。

我們的程式碼仍然非常簡單,不會嘗試解決更復雜(但重要的)問題,如:

 如果沒有伺服器執行,客戶端應該如何反應?
 客戶端是否需要RPC的某種超時時間?
 如果伺服器發生故障並引發異常,應該將其轉發給客戶端?
 在處理之前防止無效的傳入訊息(例如檢查邊界,型別)。