RabbitMQ之RPC實現
什麼是RPC?
RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的函式/方法,由於不在一個記憶體空間,不能直接呼叫,需要通過網路來表達呼叫的語義和傳達呼叫的資料。
為什麼RPC呢?就是無法在一個程序內,甚至一個計算機內通過本地呼叫的方式完成的需求,比如不同的系統間的通訊,甚至不同的組織間的通訊。由於計算能力需要橫向擴充套件,需要在多臺機器組成的叢集上部署應用,
RPC的協議有很多,比如最早的CORBA,Java RMI,Web Service的RPC風格,Hessian,Thrift,甚至Rest API。
RabbitMQ怎麼實現RPC呼叫?
Callback Queue
一般在RabbitMQ中做RPC是很簡單的。客戶端傳送請求訊息,伺服器回覆響應的訊息。為了接受響應的訊息,我們需要在請求訊息中傳送一個回撥佇列。可以使用預設的佇列(which is exclusive in the java client.):
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish ("", "rpc_queue",props,message.getBytes());
// then code to read a response message from the callback_queue...
Message properties
AMQP協議為訊息預定義了一組14個屬性。
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
大部分的屬性是很少使用的。除了以下幾種(其餘有興趣可以自行檢視):
- deliveryMode: 標記訊息傳遞模式,2-訊息持久化,其他值-瞬態。
- contentType:內容型別,用於描述編碼的mime-type. 例如經常為該屬性設定JSON編碼。
- replyTo:應答,通用的回撥佇列名稱,
- correlationId:關聯ID,方便RPC相應與請求關聯。
Correlation Id
在上述方法中為每個RPC請求建立一個回撥佇列。這是很低效的。幸運的是,一個解決方案:可以為每個客戶端建立一個單一的回撥佇列。
新的問題被提出,佇列收到一條回覆訊息,但是不清楚是那條請求的回覆。這是就需要使用correlationId屬性了。我們要為每個請求設定唯一的值。然後,在回撥佇列中獲取訊息,檢視這個屬性,關聯response和request就是基於這個屬性值的。如果我們看到一個未知的correlationId屬性值的訊息,可以放心的無視它——它不是我們傳送的請求。
你可能問道,為什麼要忽略回撥佇列中未知的資訊,而不是當作一個失敗?這是由於在伺服器端競爭條件的導致的。雖然不太可能,但是如果RPC伺服器在傳送給我們結果後,傳送請求反饋前就掛掉了,這有可能會發送未知correlationId屬性值的訊息。如果發生了這種情況,重啟RPC伺服器將會重新處理該請求。這就是為什麼在客戶端必須很好的處理重複響應,RPC應該是冪等的。
Summary
RPC的處理流程:
- 當客戶端啟動時,建立一個匿名的回撥佇列。
- 客戶端為RPC請求設定2個屬性:replyTo,設定回撥佇列名字;correlationId,標記request。
- 請求被髮送到rpc_queue佇列中。
- RPC伺服器端監聽rpc_queue佇列中的請求,當請求到來時,伺服器端會處理並且把帶有結果的訊息傳送給客戶端。接收的佇列就是replyTo設定的回撥佇列。
- 客戶端監聽回撥佇列,當有訊息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。
Demo Code
這裡採用官網的一個例子來說明,RPC客戶端通過RPC呼叫伺服器來計算斐波那契額值。
首先是服務端的程式碼:
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String args[]) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConfig.ip);
factory.setPort(RabbitConfig.port);
factory.setUsername(RabbitConfig.username);
factory.setPassword(RabbitConfig.password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib("+message+")");
String repsonse = ""+fib(n);
channel.basicPublish("", props.getReplyTo(), replyProps, repsonse.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
}
RPC客戶端:
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConfig.ip);
factory.setPort(RabbitConfig.port);
factory.setUsername(RabbitConfig.username);
factory.setPassword(RabbitConfig.password);
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true,consumer);
}
public String call(String message) throws IOException,
ShutdownSignalException, ConsumerCancelledException,
InterruptedException {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if(delivery.getProperties().getCorrelationId().equals(corrId)){
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception{
connection.close();
}
public static void main(String args[]) throws Exception{
RPCClient fibRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibRpc.call("30");
System.out.println(" [.] Got '"+response+"'");
fibRpc.close();
}
}
參考資料
歡迎支援《RabbitMQ實戰指南》以及關注微信公眾號:朱小廝的部落格。
相關推薦
RabbitMQ之RPC實現
什麼是RPC? RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的函式/方法,由於不在一個記憶體空間,不能直接呼叫,需要通過網路來表達呼叫的語義和傳達呼叫的資料。 為什麼RPC呢?就是無法在一個程序內,甚
python之rpc實現遠端運算斐波那契函式然後返回本地
RPC模式 發一條訊息到遠端機器去執行,然後吧執行結果返回,這種模式叫rpc(remote procedure call) 運用rpc之前要先了解RabbitMQ:python之RabbitMQ訊息佇列 然後我們用rpc模擬一個服務端和客戶端,實現客戶端傳送數字,服
module05-1-基於RabbitMQ rpc實現的主機管理
not exit net 目錄 min .py 取值 event 機器 需求 題目:rpc命令端需求: 可以異步的執行多個命令 對多臺機器>>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 4
基於RabbitMQ RPC實現的主機異步管理
fff 批量操作 result sel lis llb use exchange port README: 1、需求 - [ ] 利用RibbitMQ進行數據交互 - [ ] 可以對多臺服務器進行批量操作 - [ ] 執行命令後不等待命令的執行結果,而是直接讓輸入下一條命令
python--基於RabbitMQ rpc實現的主機管理
input == pub tag 3.4 num {} 配置 local 要求: 可以異步的執行多個命令對多臺機器>>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 45334>>: check_
譯: 6. RabbitMQ Spring AMQP 之 RPC
Remote procedure call (RPC) 在第二篇教程中,我們學習瞭如何使用工作佇列在多個工作人員之間分配耗時的任務。 但是如果我們需要在遠端計算機上執行一個函式並等待結果呢?嗯,這是一個不同的故事。此模式通常稱為遠端過程呼叫或RPC。 在本教程中,我們將使用RabbitMQ構建RPC系統
RabbitMQ客戶端原始碼分析(九)之RPC請求響應
宣告 Queue宣告、exchange宣告、bind等,這些都是通過同步RPC呼叫 channel.queueDeclare(queueName, durable
WebService之RPC的(Axis2、CXF、Jersey、Hessian)4中實現方式
一、什麼是WebService Web service是一個平臺獨立的,低耦合的,自包含的、基於可程式設計的web的應用程式,可使用開放的XML(標準通用標記語言下的一個子集)標準來描述、釋出、發現、協調和配置這些應用程式,用於開發分散式的互操作的應用程式。 Web
RabbitMQ中RPC的實現及其通信機制
pub elf tcl consumer 兩個 rabbit client margin result RabbitMQ中RPC的實現:客戶端發送請求消息,服務端回復響應消息,為了接受響應response,客戶端需要發送一個回調隊列的地址來接受響應,每條消息在發送的時候會帶
RabbitMQ學習(六).NET Client之RPC
6 RPC Remote procedure call implementation Python | Java | Ruby | PHP| C# Remote procedure call (RPC) (using the .NET client) 在第二
Jmeter之RPC協議指令碼開發實現
1、首先通過本文是通過jmeter的java請求方式實現RPC協議指令碼請求,具體關於java請求如果基於jmeter實現見:jmeter之java請求;如果還想了解rpc指令碼開發詳細見:rpc協議之hprose介面測試 。 2、分別需要建立兩個類 send類、Ap
openstack學習之RPC服務實現分析
openstack中的服務主要有兩種:一種是rest服務,提供Rest API;一種RPC服務,提供RPC API。本文討論RPC服務的實現。Rest服務的實現在《openstack學習之各種API》有所涉及。 RPC服務其實就是一個RPC server,client(客
Glusterfs之rpc模組原始碼分析(中)之Glusterfs的rpc模組實現(3)
歡迎大家相互交流,共同提高技術。 第三節、rpc通訊過程分析 前面兩個小節分別對rpc服務端和客戶端的建立流程做了詳細的分析,也就是說rpc客戶端和伺服器端已經能夠進行正常的通訊了(rpc客戶端已經通過connect連結上rpc伺服器了),那麼這一小節主要根據一個實際
RabbitMQ(四):RPC的實現
返回 直接 qos call true 解決 byte 停止 produce 原文:RabbitMQ(四):RPC的實現一、RPC RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解
JavaWeb網上圖書商城完整項目--day02-6.ajax校驗功能之頁面實現
ret code 需要 gis 提交 ima date else back 1 、現在我們要在regist.js中實現ajax的功能,使用用戶名到後臺查詢是否註冊,郵箱是否到後臺註冊,驗證碼是否正確的功能 我們來看regist.js的代碼 //該函數在html文檔加載完成
Django之路 - 實現登錄隨機驗證碼
短信祝福 python 中間件 程序 檢測 登錄驗證碼是每個網站登錄時的基本標配,網上也有很多相應的文章, 但是從生成驗證碼到 應用到自己的網站上的全步驟,並沒有看到很多, 為了節約大家的時間,我把整體步驟寫下來, 即拿即用哈 1. 生成隨機驗證碼 隨機驗證碼代碼 2. 如何應用到你的dj
RabbitMQ學習筆記五:RabbitMQ之優先級消息隊列
-c virtual 調用 itl 3.5 rri color images 執行順序 RabbitMQ優先級隊列註意點: 1、只有當消費者不足,不能及時進行消費的情況下,優先級隊列才會生效 2、RabbitMQ3.5以後才支持優先級隊列 代碼在博客:RabbitMQ學習筆
每天一個liunx命令3之awk實現文本文件的抓取
logs -h 名稱 name $0 rep ray 表達式 指定 =============================================================================grep -h -s -E ‘HUAWEI_9000
Hadoop Mapreduce之WordCount實現
註意 com split gin 繼承 [] leo ring exce 1.新建一個WCMapper繼承Mapper public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritab
Java 簡單的RPC 實現
com java多態 http 技術 images object ice ima framework 借用了網上某大神的例子。。。。 目錄結構是這樣的。。。 RpcFramework 主要是兩個方法。一個是暴露服務,一個為引用服務。暴露服務的主要作用是聲明一個接口的實現類。