RabbitMQ實現RPC(java)
如果我們需要在遠端計算機上執行一個函式並等待結果,這種模式通常被稱為遠端過程呼叫或RPC。
在本教程中,我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個RPC伺服器。我們將建立一個返回斐波那契數字的模擬RPC服務。
整個過程示意圖如下:
客戶端將請求傳送至rpc_queue(我們定義的訊息佇列),然後等待響應;服務端獲取請求,並處理請求,然後將請求結果返回給佇列,客戶端得知請求被響應後獲取結果。
在結果被響應之前,客戶端是被阻塞的,主執行緒會等待RPC響應
如果每個RPC請求都建立一個回撥佇列。這是非常低效,我們建立一個單一的客戶端回撥佇列。
這引發了一個新的問題,在該佇列中收到回覆時,不清楚回覆屬於哪個請求。這就需要用到 correlationId屬性。我們為沒有請求設定唯一的correlationId值。然後,當我們在回撥佇列中收到一條訊息時,我們將獲取這個值,將響應與請求的進行correlationId匹配。如果我們一致就是我們需要的結果,否則就不是。
客戶端代RPCClient 碼如下:
package com.adtec.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
//建立一個連線和一個通道,併為回撥宣告一個唯一的'回撥'佇列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//定義一個臨時變數的接受佇列名
replyQueueName = channel.queueDeclare().getQueue();
}
//傳送RPC請求
public String call(String message) throws IOException, InterruptedException {
//生成一個唯一的字串作為回撥佇列的編號
String corrId = UUID.randomUUID().toString();
//傳送請求訊息,訊息使用了兩個屬性:replyto和correlationId
//服務端根據replyto返回結果,客戶端根據correlationId判斷響應是不是給自己的
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
.build();
//釋出一個訊息,requestQueueName路由規則
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//由於我們的消費者交易處理是在單獨的執行緒中進行的,因此我們需要在響應到達之前暫停主執行緒。
//這裡我們建立的 容量為1的阻塞佇列ArrayBlockingQueue,因為我們只需要等待一個響應。
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
// String basicConsume(String queue, boolean autoAck, Consumer callback)
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
//檢查它的correlationId是否是我們所要找的那個
if (properties.getCorrelationId().equals(corrId)) {
//如果是,則響應BlockingQueue
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (IOException _ignore) {
}
}
}
}
}
服務端代RPCServer 碼如下:
package rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
//具體處理方法
private static int fib(int n) {
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
//建立連線、通道,並宣告佇列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()).build();
String response = "";
try {
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
// 返回處理結果佇列
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
// 確認訊息,已經收到後面引數 multiple:是否批量.true:將一次性確認所有小於envelope.getDeliveryTag()的訊息。
channel.basicAck(envelope.getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC
// server owner thread
synchronized (this) {
this.notify();
}
}
}
};
//取消自動確認
boolean autoAck = false ;
channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
if (connection != null)
try {
connection.close();
} catch (IOException _ignore) {
}
}
}
}
測試時先執行服務端,再執行客戶端
為了方便觀察結果,最好將客戶端和服務端在不同workspace實現
客戶端結果
相關推薦
RabbitMQ實現RPC(java)
如果我們需要在遠端計算機上執行一個函式並等待結果,這種模式通常被稱為遠端過程呼叫或RPC。 在本教程中,我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個RPC伺服器。我們將建立一個返回斐波那契數字的模擬RPC服務。 整個過程示意圖如下:
Spring-rabbitmq 實現RPC 風格呼叫例項
1. 背景 專案中原來利用rabbitmq的RPC實現遠端方法呼叫,比較簡陋,封裝的比較差,而且topic等模式均為阻塞,如今我負責進行改造,今天重點看看如何利用spring-rabbitmq實現RPC風格的呼叫 簡單說來,RPC,主要目的是利用message實現遠端方
rabbitmq實現RPC例項
最近閱讀《RabbitMq實戰指南》瞭解了rpc(remote procedure call 遠端過程呼叫)的實現。下面是測試的例子: 服務端 /** * <p> * * rpc伺服器,1、開啟佇列,2、消費訊息,3、把response傳送到回撥佇列。
(八) RabbitMQ實戰教程(面向Java開發人員)之RabbitMQ實現非同步RPC
RabbitMQ實現非同步RPC RabbitMQ Java Client 服務端步驟: 1.服務端監聽一個佇列,監聽客戶端傳送過來的訊息 2.收到訊息之後呼叫RPC服務得到呼叫結果 3.從訊息屬性中獲取reply_to,correlation_i
RabbitMq初探——用隊列實現RPC
await 生產 通過 empty 分享 qos load lose ima rabbitmq構造rpc 前言 rpc——remote procedure call 遠程調用。在我接觸的使用過http協議、thrift框架來實現遠程調用。其實消息隊列rabbitmq也
Python-RabbitMQ消息隊列實現rpc
llb author bject roc read uuid tin rip rabbit 客戶端通過發送命令來調用服務端的某些服務,服務端把結果再返回給客戶端 這樣使得RabbitMQ的消息發送端和接收端都能發送消息 返回結果的時候需要指定另一個隊列 服務器端 # -
RPC使用rabbitmq實現
bsp 本地服務 font clear 自動 配置 tran contain 學習 兩天時間重寫公司架構在本地實現測試學習 雙向連接客戶端和服務端配置: 連接rabbitmq服務器 定義消息隊列 配置發送請求的模板:交換機、消息隊列。 配置監聽處理:監聽的隊列、消息轉換處
用Java實現RPC框架例項
一、RPC簡介 RPC,全稱為Remote Procedure Call,即遠端過程呼叫,它是一個計算機通訊協議。它允許像呼叫本地服務一樣呼叫遠端服務。它可以有不同的實現方式。如RMI(遠端方法呼叫)、Hessian、Http invoker等。另外,RPC是與語言無關的。 RPC示意圖
rabbitmq學習(四):利用rabbitmq實現遠端rpc呼叫
一、rabbitmq實現rpc呼叫的原理 ·rabbitmq實現rpc的原理是:客戶端向一個佇列中傳送訊息,並註冊一個回撥的佇列用於接收服務端返回的訊息,該訊息需要宣告一個叫做correaltionId的屬性,該屬性將是該次請求的唯一標識。服務端在接受到訊息(在需要時可以驗證correaltionId)後,
rabbitmq學習(四):利用rabbitmq實現遠程rpc調用
ext new urn trace cat ued 創建 exc false 一、rabbitmq實現rpc調用的原理 ·rabbitmq實現rpc的原理是:客戶端向一個隊列中發送消息,並註冊一個回調的隊列用於接收服務端返回的消息,該消息需要聲明一個叫做correaltio
Java實現RPC(服務物件使用註解並自動注入)
使用到的技術: 註解和反射機制 包掃描以及jar包掃描 CGlib動態代理 類似於spring框架的控制反轉依賴自動注入技術 目錄結構: RPCclass註解 @Retention(RetentionPolicy.RUNTIME) @Target(ElementTy
Java通過Hadoop實現RPC通訊簡單例項
一、定義server端程式碼 1.定義一個介面,該介面繼承org.apache.hadoop.ipc.VersionedProtocol介面 import org.apache.hadoop.ipc.VersionedProtocol; /** * 1.伺服器定義介面
RabbitMQ .NET訊息佇列使用入門(三)【MVC實現RPC例子】
每一個孤獨的靈魂都需要陪伴 RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之
Java基於String Boot、Thrift、Zookeeper實現RPC
Thrift的優勢是支援異構系統,相對於http協議效率較高,之前專案一直用的是dubbo,最近想了解一下thrift相關內容,以下是嘗試工程中的一些內容,做一些記錄 說明 1.程式碼基於JDK8、spring/boot實現 2.註冊發現採用zooke
用java序列化和阻塞IO模型實現RPC
RPC是遠端過程呼叫,對於java而言,就是兩個JVM通訊,一個JVM a想要呼叫另一個JVM b中的類。b把執行結果在傳送給a的過程。好,我們就是要來實現這個過程。 兩個介面: public interface IDiff { double diff(double a,d
java+rabbitMQ實現一對一聊天
原始碼地址: https://download.csdn.net/download/weixin_40461281/10321780 上一篇文章講了RabbitMQ的安裝 接下來介紹一下具體的應用 使用java + rabbitMQ實現聊天功能的demo , 非常有助於
RabbitMQ之RPC實現
什麼是RPC? RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的函式/方法,由於不在一個記憶體空間,不能直接呼叫,需要通過網路來表達呼叫的語義和傳達呼叫的資料。 為什麼RPC呢?就是無法在一個程序內,甚
RabbitMQ中RPC的實現及其通信機制
pub elf tcl consumer 兩個 rabbit client margin result RabbitMQ中RPC的實現:客戶端發送請求消息,服務端回復響應消息,為了接受響應response,客戶端需要發送一個回調隊列的地址來接受響應,每條消息在發送的時候會帶
簡單的RPC java實現
我也承認,RPC的名聲大噪之時是在2003年,那一個“衝擊波”病毒(Blaster Worm virus)襲捲全球的一年。而“衝擊波”正是用著RPC這把刀來敲開了遠端電腦的大門。當然RPC 有更多正面的應用,比如NFS、Web Service等等。 一、RPC的介紹
RabbitMQ系列-實現RPC非同步呼叫
使用Spring AMQP實現RPC非同步呼叫 示列 伺服器端 應用啟動類程式碼, import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org