1. 程式人生 > >輕鬆搞定RabbitMQ(七)——遠端過程呼叫RPC

輕鬆搞定RabbitMQ(七)——遠端過程呼叫RPC

翻譯:http://www.rabbitmq.com/tutorials/tutorial-six-java.html
在第二篇博文中,我們已經瞭解到瞭如何使用工作佇列來向多個消費者分散耗時任務。
但是付過我們需要在遠端電腦上執行一個方法然後等待結果,該怎麼辦?這是不同的需求。這個模式通常叫做RPC。

本文我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個可擴充套件的RPC伺服器端。由於我們沒有任何真實的耗時任務需要分配,所以我們將建立一個虛擬的RPC服務,可以返回斐波納契數列。

Client interface(客戶端介面)

為了說明RPC服務可以使用,我們建立一個簡單的客戶端類。暴露一個方法——傳送RPC請求,然後阻塞直到獲得結果。

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

Callback queue(回撥佇列)

一般在RabbitMQ中做RPC是很簡單的。客戶端傳送請求訊息,伺服器回覆響應的訊息。為了接受響應的訊息,我們需要在請求訊息中傳送一個回撥佇列。可以用預設的佇列(僅限java客戶端)。試一試吧:
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

Message properties(訊息屬性)

AMQP協議為訊息預定義了一組14個屬性。大部分的屬性是很少使用的。除了一下幾種:
    • deliveryMode:標記訊息傳遞模式,2-訊息持久化,其他值-瞬態。在第二篇文章中還提到過。
    • contentType:內容型別,用於描述編碼的mime-type。例如經常為該屬性設定JSON編碼。
    • replyTo:應答,通用的回撥佇列名稱
    • correlationId:關聯ID,方便RPC響應與請求關聯
       我們需要新增一個新的匯入
import com.rabbitmq.client.AMQP.BasicProperties;

Correlation Id

       在上述方法中為每個RPC請求建立一個回撥佇列。這是很低效的。幸運的是,一個解決方案:可以為每個客戶端建立一個單一的回撥佇列。
       新的問題被提出,佇列收到一條回覆訊息,但是不清楚是那條請求的回覆。這是就需要使用correlationId屬性了。我們要為每個請求設定唯一的值。然後,在回撥佇列中獲取訊息,看看這個屬性,關聯response和request就是基於這個屬性值的。如果我們看到一個未知的correlationId屬性值的訊息,可以放心的無視它——它不是我們傳送的請求。
       你可能問道,為什麼要忽略回撥佇列中未知的資訊,而不是當作一個失敗?這是由於在伺服器端競爭條件的導致的。雖然不太可能,但是如果RPC伺服器在傳送給我們結果後,傳送請求反饋前就掛掉了,這有可能會發送未知correlationId屬性值的訊息。如果發生了這種情況,重啟RPC伺服器將會重新處理該請求。這就是為什麼在客戶端必須很好的處理重複響應,RPC應該是冪等的。

Summary(總結)




       我們的RPC的處理流程:

    1. 當客戶端啟動時,建立一個匿名的回撥佇列。
    2. 客戶端為RPC請求設定2個屬性:replyTo,設定回撥佇列名字;correlationId,標記request。
    3. 請求被髮送到rpc_queue佇列中。
    4. RPC伺服器端監聽rpc_queue佇列中的請求,當請求到來時,伺服器端會處理並且把帶有結果的訊息傳送給客戶端。接收的佇列就是replyTo設定的回撥佇列。
    5. 客戶端監聽回撥佇列,當有訊息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。

完整的例項

RPC伺服器端(RPCServer.java)

/**
 * RPC伺服器端
 * 
 * @author arron
 * @date 2015年9月30日 下午3:49:01
 * @version 1.0
 */
public class RPCServer {
	
	private static final String RPC_QUEUE_NAME = "rpc_queue";

	public static void main(String[] args) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		// 設定MabbitMQ所在主機ip或者主機名
		factory.setHost("127.0.0.1");
		// 建立一個連線
		Connection connection = factory.newConnection();
		// 建立一個頻道
		Channel channel = connection.createChannel();

		//宣告佇列
		channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

		//限制:每次最多給一個消費者傳送1條訊息
		channel.basicQos(1);

		//為rpc_queue佇列建立消費者,用於處理請求
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

		System.out.println(" [x] Awaiting RPC requests");

		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();

			//獲取請求中的correlationId屬性值,並將其設定到結果訊息的correlationId屬性中
			BasicProperties props = delivery.getProperties();
			BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
			//獲取回撥佇列名字
			String callQueueName = props.getReplyTo();
			
			String message = new String(delivery.getBody(),"UTF-8");
			
			System.out.println(" [.] fib(" + message + ")");
			
			//獲取結果
			String response = "" + fib(Integer.parseInt(message));
			//先發送回調結果
			channel.basicPublish("", callQueueName, replyProps,response.getBytes());
			//後手動傳送訊息反饋
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}

	/**
	 * 計算斐波列其數列的第n項
	 * 
	 * @param n
	 * @return
	 * @throws Exception
	 */
	private static int fib(int n) throws Exception {
		if (n < 0)
			throw new Exception("引數錯誤,n必須大於等於0");
		if (n == 0)
			return 0;
		if (n == 1)
			return 1;
		return fib(n - 1) + fib(n - 2);
	}
}
RPC客戶端(RPCClient.java):
/** 
 * 
 * @author arron
 * @date 2015年9月30日 下午3:44:43 
 * @version 1.0 
 */
public class RPCClient {

	private static final String RPC_QUEUE_NAME = "rpc_queue";
	
	private Connection connection;
	private Channel channel;
	private String replyQueueName;
	private QueueingConsumer consumer;
	
	public RPCClient() throws Exception {
	    ConnectionFactory factory = new ConnectionFactory();
		// 設定MabbitMQ所在主機ip或者主機名
		factory.setHost("127.0.0.1");
		// 建立一個連線
		connection = factory.newConnection();
		// 建立一個頻道
		channel = connection.createChannel();
		
		//宣告佇列
		channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
		
		//為每一個客戶端獲取一個隨機的回撥佇列
	    replyQueueName = channel.queueDeclare().getQueue();
	    //為每一個客戶端建立一個消費者(用於監聽回撥佇列,獲取結果)
	    consumer = new QueueingConsumer(channel);
	    //消費者與佇列關聯
	    channel.basicConsume(replyQueueName, true, consumer);
	}
	
	/**
	 * 獲取斐波列其數列的值
	 * 
	 * @param message
	 * @return
	 * @throws Exception
	 */
	public String call(String message) throws Exception{
		String response = null;
	    String corrId = java.util.UUID.randomUUID().toString();

	    //設定replyTo和correlationId屬性值
	    BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();

	    //傳送訊息到rpc_queue佇列
	    channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());

	    while (true) {
	        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
	        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
	            response = new String(delivery.getBody(),"UTF-8");
	            break;
	        }
	    }

	    return response; 
	}

	public static void main(String[] args) throws Exception {
		RPCClient fibonacciRpc = new RPCClient();   
		String result = fibonacciRpc.call("4");
		System.out.println( "fib(4) is " + result);
	}
}
輸出結果:
fib(4) is 3

相關推薦

輕鬆RabbitMQ——遠端過程呼叫RPC

翻譯:http://www.rabbitmq.com/tutorials/tutorial-six-java.html在第二篇博文中,我們已經瞭解到瞭如何使用工作佇列來向多個消費者分散耗時任務。但是付過我們需要在遠端電腦上執行一個方法然後等待結果,該怎麼辦?這是不同的需求。

輕鬆RabbitMQ——工作佇列之訊息分發機制

       上一篇博文中簡單介紹了一下RabbitMQ的基礎知識,並寫了一個經典語言入門程式——HelloWorld。本篇博文中我們將會建立一個工作佇列用來在工作者(consumer)間分發耗時任務。同樣是翻譯的官網例項。 工作佇列        在前一篇博文中,我們完

輕松RabbitMQ——路由選擇

byte[] view 轉發器 ews 磁盤空間 表示 info 直接 net 轉自 http://blog.csdn.net/xiaoxian8023/article/details/48733249 翻譯地址:http://www.rabbitmq.com/tutori

【筆記】Mybatis高階查詢--儲存過程呼叫

以下例子展示Mybatis儲存過程呼叫,與普通查詢基本一樣,只是在配置對映時要加上statementType=“CALLABLE”,由於儲存過程方式不支援Mybatis的二級快取,所以要加上useCache=“false”。 在儲存過程中使用引數時,除了配置屬性名外,還需要指定

[譯]RabbitMQ教程C#版 - 遠端過程呼叫(RPC)

原文: [譯]RabbitMQ教程C#版 - 遠端過程呼叫(RPC) 先決條件 本教程假定 RabbitMQ 已經安裝,並執行在localhost標準埠(5672)。如果你使用不同的主機、埠或證書,則需要調整連線設定。 從哪裡獲得幫助 如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯絡我們。

.Net下RabbitMQ的使用(8) -- 遠端過程呼叫RPC

RPC是在計算中是一種常見的模式,是通常我要用訊息佇列來實現RPC有3個關鍵點: 1. 服務的定址 2. 訊息的接收 3. 訊息的關聯 在RabbitMQ的.net客戶端裡,提供了2個類:SimpleRpcClient 和 SimpleRpcServer

.Net下RabbitMQ的使用(7) -- 遠端過程呼叫RPC

RPC是在計算中是一種常見的模式,是通常我要用訊息佇列來實現RPC有3個關鍵點: 1. 服務的定址 2. 訊息的接收 3. 訊息的關聯 在RabbitMQ的.net客戶端裡,提供了2個類:SimpleRpcClient 和 SimpleRpcServer 來讓我們

python採用pika庫使用rabbitmqPublish\Subscribe(訊息釋出\訂閱)

之前的例子都基本都是1對1的訊息傳送和接收,即訊息只能傳送到指定的queue裡,但有些時候你想讓你的訊息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了, Exchange在定義的時候是有型別的,以決定到底是哪些Queue符合條件,可以接收訊息 fanout: 所有bin

Git的使用教程遠端倉庫

     之前的操作我們只是把Git當做了一個倉庫,做本地的一個版本管理,這對Git來說簡直大材小用。Git作為分散式版本控制系統,分佈才是其特色,如何分佈呢?肯定要有一臺機器充當原始的版本庫,其他的機器“克隆”這個原始版本庫,其實每臺機器上的版本都一樣,沒有主次之分,之所

從原理上編碼-- Base64編碼

  開發者對Base64編碼肯定很熟悉,是否對它有很清晰的認識就不一定了。實際上Base64已經簡單到不能再簡單了,如果對它的理解還是模稜兩可實在不應該。大概介紹一下Base64的相關內容,花幾分鐘時間就可以徹底理解它。文章下邊貼了一個Base64的編解碼器,方便閱讀文章的同時來實驗。   一. Base6

RabbitMQ遠端連線RabbitMQ

為了避免汙染宿主系統環境,於是在虛擬機器中搭建了一個linux環境並且安裝了rabbitmq-server。然後在遠端連線的時候一直連線失敗。官網上面給的例子都是在本地使用系統預設的guest使用者連線的。沒有給出遠端連線的例子,於是閱讀文件發現: When th

訊息中介軟體——RabbitMQ高階特性全在這裡!

前言 前面我們介紹了RabbitMQ的安裝、各大訊息中介軟體的對比、AMQP核心概念、管控臺的使用、快速入門RabbitMQ。本章將介紹RabbitMQ的高階特性。分兩篇(上/下)進行介紹。 訊息如何保障100%的投遞成功? 冪等性概念詳解 在海量訂單產生的業務高峰期,如何避免訊息的重複消費的問題?

dubbo原始碼淺析-遠端服務呼叫流程

消費端呼叫遠端服務介面時,使用上和呼叫普通的java介面是沒有任何區別,但是服務消費者和提供者是跨JVM和主機的,客戶端如何封裝請求讓服務端理解請求並且解析服務端返回的介面呼叫結果,服務端如何解析客戶端的請求並且向客戶端返回呼叫結果,這些框架是如何實現的,下面就

Netty原始碼分析 ----- read過程 原始碼分析

在上一篇文章中,我們分析了processSelectedKey這個方法中的accept過程,本文將分析一下work執行緒中的read過程。 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

RabbitMQ入門:遠端過程呼叫(RPC)

假如我們想要呼叫遠端的一個方法或函式並等待執行結果,也就是我們通常說的遠端過程呼叫(Remote Procedure Call)。怎麼辦? 今天我們就用RabbitMQ來實現一個簡單的RPC系統:客戶端傳送一個請求訊息,服務端以一個響應訊息迴應。為了能夠接收到響應,客戶端在傳送訊息的同時傳送一個回撥佇列用來

遠端過程呼叫 RPC 及其協議

遠端過程呼叫 簡介 RPC是遠端過程呼叫(Remote Procedure Call)的縮寫形式。SAP系統RPC呼叫的原理其實很簡單,有一些類似於三層構架的C/S系統,第三方的客戶程式通過介面呼叫SAP內部的標準或自定義函式,獲得函式返回的資料進行處理後顯示或列印。

PHP實現遠端過程呼叫RPC

一、初識RPC RPC(Remote Procedure Call)—遠端過程呼叫,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。 二、工作原理 執行時,一次客戶機對伺服器的RPC呼叫,其內部操作大致有如下十步: 1.呼叫客戶端控制代碼;執行傳送引數

com.microsoft.sqlserver.jdbc.SQLServerException: 傳入的表格格式資料流(TDS)遠端過程呼叫(RPC)協議流不正確。此 RPC 請求中提供了過多的引數。

sqlserver在做批量插入的時候出現這個錯誤: com.microsoft.sqlserver.jdbc.SQLServerException: 傳入的表格格式資料流(TDS)遠端過程呼叫(RPC)協議流不正確。此 RPC 請求中提供了過多的引數。最多應為 2100。

遠端過程呼叫(RPC)詳解

本文介紹了什麼是遠端過程呼叫(RPC),RPC 有哪些常用的方法,RPC 經歷了哪些發展階段,以及比較了各種 RPC 技術的優劣。 什麼是 RPC RPC 是遠端過程呼叫(Remote Procedure Call)的縮寫形式,Birrell 和 N

奇怪的資料插入異常:傳入的表格格式資料流(TDS)遠端過程呼叫(RPC)協議流不正確。

前天完成了手頭的工作後,經理交給我一個bug讓我看下,我接過後看了下Bug資訊,是從未見過的異常,但根據異常資訊提示又很容易判斷出異常原因。 異常資訊:傳入的表格格式資料流(TDS)遠端過程呼叫(RPC)協議流不正確。引數 7 ("@ExchangeRate"): 提供的值不