1. 程式人生 > >RabbitMQ(四):RPC的實現

RabbitMQ(四):RPC的實現

返回 直接 qos call true 解決 byte 停止 produce

原文:RabbitMQ(四):RPC的實現

一、RPC

  RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。有很多方式可以實現,譬如UNIX RPC、REST API、WCF和SOAP。這些傳統的RPC實現方法有共同之處:那就是客戶端和服務器端緊密相連,客戶端直接連接上服務器,發送一個請求,然後就停下來等待服務器的應答。

  這種點對點的性質模式有很多好處,它使得在小範圍內的拓撲變得簡單。但是當有眾多服務器的時候,客戶端如何發現在那臺服務器上可以找到其他想要的服務就變的麻煩,SOAP和大多數的企業RPC已經采用復雜的補充協議和服務目錄,但也帶來了額外的復雜度和眾多故障點。

  但是,用RabbitMQ來實現RPC可以無需關心由那臺服務器來處理,也不必擔心服務器奔潰,只需要簡單的發送消息,然後等待響應即可。一般接觸RabbitMQ的都是用發後即忘模型,用於發送郵件等通知或者處理其他並行處理事件,也就是AMQP的消息是單向的。如何才能讓服務器將處理結果返回給原始的客戶端呢?

二、消息應答和私有隊列

  RabbitMQ有一個優雅的解決方案:使用消息來發回應答。在每個AMQP消息頭裏有個字段reply_to.消息的生產者可以通過該字段來確定隊列的名稱,並監聽應答隊列等待應答。然後接收消息的RPC服務器能偶檢查reply_to字段,並創建包含應答內容的新的消息,並以隊列名稱為路由鍵,通過應答隊列將處理結果發回給生產者。這裏我們不需要創建應答隊列的名字也不需要將應答隊列綁定到交換器上,這是因為沒有聲明隊列的名稱RabbitMQ會自動申明,消息發布到RabbitMQ在沒有指名交換器的時候,RabbitMQ就會讓位目的地是應答隊列,而路由鍵就是應答隊列名稱。

  所以RabbitMQ實現RPC需要比一般的消息通信多以下幾個步驟:

  1. 生產者創建一個應答隊列,並監聽該隊列。
  2. 生產者為消息頭中的Reply_to和CorrelationId字段賦值。reply_to是應答隊列的名稱,CorrelationId是相關標識由消費者返回後對比確認是返回我們的結果。
  3. 消費者返回生產者發送的消息頭,並且不需要綁定交換器,並將Reply_to參數作為路由鍵發送消息到應答隊列。

三、自己實現簡單的RPC

  其實簡單的講就是生產者在發送消息後接收消息,消費者在接受消息後發送消息,生產者多了一步接收處理消息,消費者多了一步發送消息。我這裏簡化了一些操作,爭取用最少的代碼實現,具體代碼如下:

  生產者:

private static void MySelfRPCProducer()
{
    var conn_factory = new ConnectionFactory(){HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672};
    using (var conn = conn_factory.CreateConnection())
    {
        using (var channel = conn.CreateModel())
        {
            IBasicProperties pro = channel.CreateBasicProperties();
            pro.ReplyTo = channel.QueueDeclare().QueueName;//創建應答隊列並返回隊列名稱,這個方法創建的隊列exclusive和auto_delete都是true,這樣可以確保沒有人能竊取信息
            pro.ContentType = "text/plain";
            string corrId = Guid.NewGuid().ToString();
            pro.CorrelationId = corrId;

            channel.BasicPublish("", "rpc_queue", pro, Encoding.UTF8.GetBytes("小黃"));
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (ea, ch) =>
            {
                //比較CorrelationId確認是返回的我們的消息
                if (ch.BasicProperties.CorrelationId == corrId)
                {
                    //處理返回結果
                    string msg = Encoding.UTF8.GetString(ch.Body);
                    Console.WriteLine(msg);
                }
            };
            string consumer_tag = channel.BasicConsume(pro.ReplyTo, true, consumer);//監聽應答隊列
            channel.BasicCancel(consumer_tag);
        }
    }
    Console.ReadLine();
}

  消費者:

private static void MySelfRPCCousmer()
{
    var conn_factory = new ConnectionFactory(){HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672};
    using (var conn = conn_factory.CreateConnection())
    {
        using (var channel = conn.CreateModel())
        {
            channel.QueueDeclare("rpc_queue", false, false, false, null);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicQos(0, 1, false);
            consumer.Received += (ea, ch) =>
            {
                string msg = Encoding.UTF8.GetString(ch.Body);
                Console.WriteLine("接收到消息:" + msg);
                //發送處理結果
                channel.BasicPublish("", ch.BasicProperties.ReplyTo, ch.BasicProperties, Encoding.UTF8.GetBytes(msg + "給我回電話了"));
                channel.BasicAck(ch.DeliveryTag, false);
            };
            string consumer_tag = channel.BasicConsume("rpc_queue", false, consumer);
            Console.ReadLine();//這裏先停止運行下面的代碼,因為需要持續監聽,信道斷開就監聽不了了
            channel.BasicCancel(consumer_tag);
        }
    }
}

四、RabbitMQ封裝好的RPC

  其實RabbitMQ已經封裝好了RPC相應的對象,分別是SimpleRpcClient和SimpleRpcServer。客戶端在初始化SimpleRpcClient後主要可以通過Call方法發送消息並返回服務端處理結果。服務端的SimpleRpcServer內部定義了很多虛方法,具體的消息處理是我們自己決定的,所以需要繼承SimpleRpcServer後實現相應方法,通過實現重寫HandleSimpleCall方法可以返回給客戶端數據。具體代碼如下所示:

  客戶端:

private static void RabbitMQRPCProducer()
{
    var conn_factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 };
    using (var conn = conn_factory.CreateConnection())
    {
        using (var channel = conn.CreateModel())
        {
            //創建client的rpc
            SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "rpc_queue"));
            bool flag = true;
            var sendmsg = "";
            while (flag)
            {
                Console.WriteLine("請輸入要發送的消息");
                sendmsg = Console.ReadLine();
                if (string.IsNullOrWhiteSpace(sendmsg))
                {
                    Console.Write("請輸入消息");
                    continue;
                }
                var msg = client.Call(Encoding.UTF8.GetBytes(sendmsg));
                Console.WriteLine(Encoding.UTF8.GetString(msg));
            }
            Console.ReadKey();
        }
    }
}

  服務端:

private static void RabbitMQRPCCousmer()
{

    var conn_factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 };
    using (var conn = conn_factory.CreateConnection())
    {
        //創建返回一個新的頻道
        using (var channel = conn.CreateModel())
        {
            channel.QueueDeclare("rpc_queue", false, false, false, null);//創建一個rpc queue
            SimpleRpcServer rpc = new MySimpleRpcServer(new Subscription(channel, "rpc_queue"));
            Console.WriteLine("服務端啟動成功");
            rpc.MainLoop(); Console.ReadKey();
        }
    }
}

  繼承實現方法:

class MySimpleRpcServer : SimpleRpcServer
{
    public MySimpleRpcServer(Subscription subscription) : base(subscription)
    {
    }
    /// <summary>
    /// 執行完成後進行回調
    /// </summary>
    public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
    {
        replyProperties = null;
        return Encoding.UTF8.GetBytes($"給{Encoding.UTF8.GetString(body)}發送短信成功");
    }
}

五、小結

  以上就是RabbitMQ對於RPC的最簡單的實現,與大家共勉。

RabbitMQ(四):RPC的實現