1. 程式人生 > >RabbitMQ之RPC實現

RabbitMQ之RPC實現

什麼是RPC?

RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的函式/方法,由於不在一個記憶體空間,不能直接呼叫,需要通過網路來表達呼叫的語義和傳達呼叫的資料。

為什麼RPC呢?就是無法在一個程序內,甚至一個計算機內通過本地呼叫的方式完成的需求,比如不同的系統間的通訊,甚至不同的組織間的通訊。由於計算能力需要橫向擴充套件,需要在多臺機器組成的叢集上部署應用,

RPC的協議有很多,比如最早的CORBA,Java RMI,Web Service的RPC風格,Hessian,Thrift,甚至Rest API。

RabbitMQ怎麼實現RPC呼叫?

Callback Queue

一般在RabbitMQ中做RPC是很簡單的。客戶端傳送請求訊息,伺服器回覆響應的訊息。為了接受響應的訊息,我們需要在請求訊息中傳送一個回撥佇列。可以使用預設的佇列(which is exclusive in the java client.):

callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish
("", "rpc_queue",props,message.getBytes()); // then code to read a response message from the callback_queue...

Message properties

AMQP協議為訊息預定義了一組14個屬性。

private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer
priority; private String correlationId; private String replyTo; private String expiration; private String messageId; private Date timestamp; private String type; private String userId; private String appId; private String clusterId;

大部分的屬性是很少使用的。除了以下幾種(其餘有興趣可以自行檢視):

  • deliveryMode: 標記訊息傳遞模式,2-訊息持久化,其他值-瞬態。
  • contentType:內容型別,用於描述編碼的mime-type. 例如經常為該屬性設定JSON編碼。
  • replyTo:應答,通用的回撥佇列名稱,
  • correlationId:關聯ID,方便RPC相應與請求關聯。

Correlation Id

在上述方法中為每個RPC請求建立一個回撥佇列。這是很低效的。幸運的是,一個解決方案:可以為每個客戶端建立一個單一的回撥佇列。

新的問題被提出,佇列收到一條回覆訊息,但是不清楚是那條請求的回覆。這是就需要使用correlationId屬性了。我們要為每個請求設定唯一的值。然後,在回撥佇列中獲取訊息,檢視這個屬性,關聯response和request就是基於這個屬性值的。如果我們看到一個未知的correlationId屬性值的訊息,可以放心的無視它——它不是我們傳送的請求。

你可能問道,為什麼要忽略回撥佇列中未知的資訊,而不是當作一個失敗?這是由於在伺服器端競爭條件的導致的。雖然不太可能,但是如果RPC伺服器在傳送給我們結果後,傳送請求反饋前就掛掉了,這有可能會發送未知correlationId屬性值的訊息。如果發生了這種情況,重啟RPC伺服器將會重新處理該請求。這就是為什麼在客戶端必須很好的處理重複響應,RPC應該是冪等的。

Summary

這裡寫圖片描述
RPC的處理流程:

  1. 當客戶端啟動時,建立一個匿名的回撥佇列。
  2. 客戶端為RPC請求設定2個屬性:replyTo,設定回撥佇列名字;correlationId,標記request。
  3. 請求被髮送到rpc_queue佇列中。
  4. RPC伺服器端監聽rpc_queue佇列中的請求,當請求到來時,伺服器端會處理並且把帶有結果的訊息傳送給客戶端。接收的佇列就是replyTo設定的回撥佇列。
  5. 客戶端監聽回撥佇列,當有訊息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。

Demo Code

這裡採用官網的一個例子來說明,RPC客戶端通過RPC呼叫伺服器來計算斐波那契額值。
首先是服務端的程式碼:

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String args[]) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RabbitConfig.ip);
        factory.setPort(RabbitConfig.port);
        factory.setUsername(RabbitConfig.username);
        factory.setPassword(RabbitConfig.password);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
        System.out.println(" [x] Awaiting RPC requests");

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
            String message = new String(delivery.getBody());
            int n = Integer.parseInt(message);
            System.out.println(" [.] fib("+message+")");
            String repsonse = ""+fib(n);
            channel.basicPublish("", props.getReplyTo(), replyProps, repsonse.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

    private static int fib(int n) throws Exception {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }
}

RPC客戶端:

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RabbitConfig.ip);
        factory.setPort(RabbitConfig.port);
        factory.setUsername(RabbitConfig.username);
        factory.setPassword(RabbitConfig.password);

        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true,consumer);
    }

    public String call(String message) throws IOException,
            ShutdownSignalException, ConsumerCancelledException,
            InterruptedException {
        String response = null;
        String corrId = UUID.randomUUID().toString();

        BasicProperties props = new BasicProperties.Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        channel.basicPublish("", requestQueueName, props, message.getBytes());

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if(delivery.getProperties().getCorrelationId().equals(corrId)){
                response = new String(delivery.getBody());
                break;
            }
        }

        return response;
    }

    public void close() throws Exception{
        connection.close();
    }

    public static void main(String args[]) throws Exception{
        RPCClient fibRpc = new RPCClient();
        System.out.println(" [x] Requesting fib(30)");
        String response = fibRpc.call("30");
        System.out.println(" [.] Got '"+response+"'");
        fibRpc.close();

    }
}

參考資料

歡迎支援《RabbitMQ實戰指南》以及關注微信公眾號:朱小廝的部落格。

相關推薦

RabbitMQRPC實現

什麼是RPC? RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的函式/方法,由於不在一個記憶體空間,不能直接呼叫,需要通過網路來表達呼叫的語義和傳達呼叫的資料。 為什麼RPC呢?就是無法在一個程序內,甚

pythonrpc實現遠端運算斐波那契函式然後返回本地

RPC模式 發一條訊息到遠端機器去執行,然後吧執行結果返回,這種模式叫rpc(remote procedure call)   運用rpc之前要先了解RabbitMQ:python之RabbitMQ訊息佇列 然後我們用rpc模擬一個服務端和客戶端,實現客戶端傳送數字,服

module05-1-基於RabbitMQ rpc實現的主機管理

not exit net 目錄 min .py 取值 event 機器 需求 題目:rpc命令端需求: 可以異步的執行多個命令 對多臺機器>>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 4

基於RabbitMQ RPC實現的主機異步管理

fff 批量操作 result sel lis llb use exchange port README: 1、需求 - [ ] 利用RibbitMQ進行數據交互 - [ ] 可以對多臺服務器進行批量操作 - [ ] 執行命令後不等待命令的執行結果,而是直接讓輸入下一條命令

python--基於RabbitMQ rpc實現的主機管理

input == pub tag 3.4 num {} 配置 local 要求: 可以異步的執行多個命令對多臺機器>>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 45334>>: check_

譯: 6. RabbitMQ Spring AMQP RPC

Remote procedure call (RPC) 在第二篇教程中,我們學習瞭如何使用工作佇列在多個工作人員之間分配耗時的任務。 但是如果我們需要在遠端計算機上執行一個函式並等待結果呢?嗯,這是一個不同的故事。此模式通常稱為遠端過程呼叫或RPC。 在本教程中,我們將使用RabbitMQ構建RPC系統

RabbitMQ客戶端原始碼分析(九)RPC請求響應

宣告 Queue宣告、exchange宣告、bind等,這些都是通過同步RPC呼叫 channel.queueDeclare(queueName, durable

WebServiceRPC的(Axis2、CXF、Jersey、Hessian)4中實現方式

一、什麼是WebService Web service是一個平臺獨立的,低耦合的,自包含的、基於可程式設計的web的應用程式,可使用開放的XML(標準通用標記語言下的一個子集)標準來描述、釋出、發現、協調和配置這些應用程式,用於開發分散式的互操作的應用程式。 Web

RabbitMQRPC實現及其通信機制

pub elf tcl consumer 兩個 rabbit client margin result RabbitMQ中RPC的實現:客戶端發送請求消息,服務端回復響應消息,為了接受響應response,客戶端需要發送一個回調隊列的地址來接受響應,每條消息在發送的時候會帶

RabbitMQ學習(六).NET ClientRPC

6 RPC Remote procedure call implementation Python | Java | Ruby | PHP| C# Remote procedure call (RPC) (using the .NET client) 在第二

JmeterRPC協議指令碼開發實現

1、首先通過本文是通過jmeter的java請求方式實現RPC協議指令碼請求,具體關於java請求如果基於jmeter實現見:jmeter之java請求;如果還想了解rpc指令碼開發詳細見:rpc協議之hprose介面測試  。 2、分別需要建立兩個類 send類、Ap

openstack學習RPC服務實現分析

openstack中的服務主要有兩種:一種是rest服務,提供Rest API;一種RPC服務,提供RPC API。本文討論RPC服務的實現。Rest服務的實現在《openstack學習之各種API》有所涉及。 RPC服務其實就是一個RPC server,client(客

Glusterfsrpc模組原始碼分析(中)Glusterfs的rpc模組實現(3)

歡迎大家相互交流,共同提高技術。 第三節、rpc通訊過程分析 前面兩個小節分別對rpc服務端和客戶端的建立流程做了詳細的分析,也就是說rpc客戶端和伺服器端已經能夠進行正常的通訊了(rpc客戶端已經通過connect連結上rpc伺服器了),那麼這一小節主要根據一個實際

RabbitMQ(四):RPC實現

返回 直接 qos call true 解決 byte 停止 produce 原文:RabbitMQ(四):RPC的實現一、RPC   RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解

JavaWeb網上圖書商城完整項目--day02-6.ajax校驗功能頁面實現

ret code 需要 gis 提交 ima date else back 1 、現在我們要在regist.js中實現ajax的功能,使用用戶名到後臺查詢是否註冊,郵箱是否到後臺註冊,驗證碼是否正確的功能 我們來看regist.js的代碼 //該函數在html文檔加載完成

Django路 - 實現登錄隨機驗證碼

短信祝福 python 中間件 程序 檢測 登錄驗證碼是每個網站登錄時的基本標配,網上也有很多相應的文章, 但是從生成驗證碼到 應用到自己的網站上的全步驟,並沒有看到很多, 為了節約大家的時間,我把整體步驟寫下來, 即拿即用哈 1. 生成隨機驗證碼 隨機驗證碼代碼 2. 如何應用到你的dj

RabbitMQ學習筆記五:RabbitMQ優先級消息隊列

-c virtual 調用 itl 3.5 rri color images 執行順序 RabbitMQ優先級隊列註意點: 1、只有當消費者不足,不能及時進行消費的情況下,優先級隊列才會生效 2、RabbitMQ3.5以後才支持優先級隊列 代碼在博客:RabbitMQ學習筆

每天一個liunx命令3awk實現文本文件的抓取

logs -h 名稱 name $0 rep ray 表達式 指定 =============================================================================grep -h -s -E ‘HUAWEI_9000

Hadoop MapreduceWordCount實現

註意 com split gin 繼承 [] leo ring exce 1.新建一個WCMapper繼承Mapper public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritab

Java 簡單的RPC 實現

com java多態 http 技術 images object ice ima framework 借用了網上某大神的例子。。。。 目錄結構是這樣的。。。 RpcFramework 主要是兩個方法。一個是暴露服務,一個為引用服務。暴露服務的主要作用是聲明一個接口的實現類。