1. 程式人生 > >RabbitMQ RPC

RabbitMQ RPC

 在《rabbitmq學習2:Work Queues 》中我們已經知道了在多個worker如何分配耗時的任務。如果我現在要在遠端的機器上執行然後得到結果,那應當怎麼做呢?那就要用到RPC(Remote Procedure Call or RPC )了!

   現在來看看來看看Rabbitmq中RPC吧!RPC的工作示意圖如下:


   上圖中的C代表客戶端,S表示伺服器端;Rabbitmq中的RPC流程如下:

1、首先客戶端傳送一個reply_to和corrention_id的請求,釋出到RPC佇列中;

2、伺服器端處理這個請求,並把處理結果釋出到一個回撥Queue,此Queue的名稱應當與reply_to的名稱一致

3、客戶端從回撥Queue中得到先前corrention_id設定的值的處理結果。如果碰到和先前不一樣的corrention_id的值,將會忽略而不是丟擲異常。

  對於上面所提到的回撥Queue中的消費處理使用的是BasicProperties類;而訊息 屬性在AMQP的協議中規定有14個;而很多大部分我們沒有用到。常用的幾個屬性有:

English程式碼 複製程式碼 收藏程式碼
  1. Message properties  
  2. The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:  
  3. delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.   
  4. content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.   
  5. reply_to: Commonly used to name a callback queue.   
  6. correlation_id: Useful to correlate RPC responses with requests.   

 delivery_mode : 標記訊息是永續性訊息還是瞬態資訊。在前面的“Work Queue”中我們已經提到過;

  content_type : 用來描述MIME的型別。如把其型別設定為JSON;

  reply_to : 用於命名一個回撥Queue;

  correlation_id : 用於與相關聯的請求的RPC響應.

  現在我們就開始RPC的程式吧!

client的程式碼如下:

Java程式碼 複製程式碼 收藏程式碼
  1. package com.abin.rabbitmq;  
  2. import java.util.UUID;  
  3. import com.rabbitmq.client.AMQP.BasicProperties;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8. publicclass RPCClient {  
  9.     private Connection connection;  
  10.     private Channel channel;  
  11.     private String requestQueueName = "rpc_queue";  
  12.     private String replyQueueName;  
  13.     private QueueingConsumer consumer;  
  14.     public RPCClient() throws Exception {  
  15.         ConnectionFactory factory = new ConnectionFactory();  
  16.         factory.setHost("localhost");  
  17.         connection = factory.newConnection();  
  18.         channel = connection.createChannel();  
  19.         replyQueueName = channel.queueDeclare().getQueue();  
  20.         consumer = new QueueingConsumer(channel);  
  21.         channel.basicConsume(replyQueueName, true, consumer);  
  22.     }  
  23.     public String call(String message) throws Exception {  
  24.         String response = null;  
  25.         String corrId = UUID.randomUUID().toString();  
  26.         BasicProperties props = new BasicProperties();  
  27.         props.setReplyTo(replyQueueName);  
  28.         props.setCorrelationId(corrId);  
  29.         channel.basicPublish("", requestQueueName, props, message.getBytes());  
  30.         while (true) {  
  31.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  32.             if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
  33.                 response = new String(delivery.getBody(), "UTF-8");  
  34.                 break;  
  35.             }  
  36.         }  
  37.         return response;  
  38.     }  
  39.     publicvoid close() throws Exception {  
  40.         connection.close();  
  41.     }  
  42.     publicstaticvoid main(String[] argv) {  
  43.         RPCClient fibonacciRpc = null;  
  44.         String response = null;  
  45.         try {  
  46.             fibonacciRpc = new RPCClient();  
  47.             System.out.println(" [x] Requesting fib(30)");  
  48.             response = fibonacciRpc.call("30");  
  49.             System.out.println(" [.] Got '" + response + "'");  
  50.             System.out.println(" [x] Requesting fib(-1)");  
  51.             response = fibonacciRpc.call("-1");  
  52.             System.out.println(" [.] Got '" + response + "'");  
  53.             System.out.println(" [x] Requesting fib(a)");  
  54.             response = fibonacciRpc.call("a");  
  55.             System.out.println(" [.] Got '" + response + "'");  
  56.         } catch (Exception e) {  
  57.             e.printStackTrace();  
  58.         } finally {  
  59.             if (fibonacciRpc != null) {  
  60.                 try {  
  61.                     fibonacciRpc.close();  
  62.                 } catch (Exception ignore) {  
  63.                 }  
  64.             }  
  65.         }  
  66.     }  
  67. }  
package com.abin.rabbitmq;

import java.util.UUID;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class RPCClient {
	private Connection connection;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;
	private QueueingConsumer consumer;

	public RPCClient() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		connection = factory.newConnection();
		channel = connection.createChannel();

		replyQueueName = channel.queueDeclare().getQueue();
		consumer = new QueueingConsumer(channel);
		channel.basicConsume(replyQueueName, true, consumer);
	}

	public String call(String message) throws Exception {
		String response = null;
		String corrId = UUID.randomUUID().toString();

		BasicProperties props = new BasicProperties();
		props.setReplyTo(replyQueueName);
		props.setCorrelationId(corrId);

		channel.basicPublish("", requestQueueName, props, message.getBytes());

		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			if (delivery.getProperties().getCorrelationId().equals(corrId)) {
				response = new String(delivery.getBody(), "UTF-8");
				break;
			}
		}

		return response;
	}

	public void close() throws Exception {
		connection.close();
	}

	public static void main(String[] argv) {
		RPCClient fibonacciRpc = null;
		String response = null;
		try {
			fibonacciRpc = new RPCClient();

			System.out.println(" [x] Requesting fib(30)");
			response = fibonacciRpc.call("30");
			System.out.println(" [.] Got '" + response + "'");
			System.out.println(" [x] Requesting fib(-1)");
			response = fibonacciRpc.call("-1");
			System.out.println(" [.] Got '" + response + "'");
			System.out.println(" [x] Requesting fib(a)");
			response = fibonacciRpc.call("a");
			System.out.println(" [.] Got '" + response + "'");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (fibonacciRpc != null) {
				try {
					fibonacciRpc.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

 server的程式碼如下:

Java程式碼 複製程式碼 收藏程式碼
  1. package com.abin.rabbitmq;  
  2. import com.rabbitmq.client.AMQP.BasicProperties;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.QueueingConsumer;  
  7. publicclass RPCServer {  
  8.     privatestaticfinal String RPC_QUEUE_NAME = "rpc_queue";  
  9.     privatestaticint fib(int n) {  
  10.         if (n > 1)  
  11.             return fib(n - 1) + fib(n - 2);  
  12.         else
  13.             return n;  
  14.     }  
  15.     publicstaticvoid main(String[] argv) {  
  16.         Connection connection = null;  
  17.         Channel channel = null;  
  18.         try {  
  19.             ConnectionFactory factory = new ConnectionFactory();  
  20.             factory.setHost("localhost");  
  21.             connection = factory.newConnection();  
  22.             channel = connection.createChannel();  
  23.             channel.queueDeclare(RPC_QUEUE_NAME, falsefalsefalsenull);  
  24.             channel.basicQos(1);  
  25.             QueueingConsumer consumer = new QueueingConsumer(channel);  
  26.             channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  
  27.             System.out.println(" [x] Awaiting RPC requests");  
  28.             while (true) {  
  29.                 String response = null;  
  30.                 QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  31.                 BasicProperties props = delivery.getProperties();  
  32.                 BasicProperties replyProps = new BasicProperties();  
  33.                 replyProps.setCorrelationId(props.getCorrelationId());  
  34.                 try {  
  35.                     String message = new String(delivery.getBody(), "UTF-8");  
  36.                     int n = Integer.parseInt(message);  
  37.                     System.out.println(" [.] fib(" + message + ")");  
  38.                     response = "" + fib(n);  
  39.                 } catch (Exception e) {  
  40.                     System.out.println(" [.] " + e.toString());  
  41.                     response = "";  
  42.                 } finally {  
  43.                     channel.basicPublish("", props.getReplyTo(), replyProps,  
  44.                             response.getBytes("UTF-8"));  
  45.                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(),  
  46.                             false);  
  47.                 }  
  48.             }  
  49.         } catch (Exception e) {  
  50.             e.printStackTrace();  
  51.         } finally {  
  52.             if (connection != null) {  
  53.                 try {  
  54.                     connection.close();  
  55.                 } catch (Exception ignore) {  
  56.                 }  
  57.             }  
  58.         }  
  59.     }  
  60. }  
package com.abin.rabbitmq;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class RPCServer {
	private static final String RPC_QUEUE_NAME = "rpc_queue";

	private static int fib(int n) {
		if (n > 1)
			return fib(n - 1) + fib(n - 2);
		else
			return n;
	}

	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");

			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

			channel.basicQos(1);

			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

			System.out.println(" [x] Awaiting RPC requests");

			while (true) {
				String response = null;

				QueueingConsumer.Delivery delivery = consumer.nextDelivery();

				BasicProperties props = delivery.getProperties();
				BasicProperties replyProps = new BasicProperties();
				replyProps.setCorrelationId(props.getCorrelationId());

				try {
					String message = new String(delivery.getBody(), "UTF-8");
					int n = Integer.parseInt(message);

					System.out.println(" [.] fib(" + message + ")");
					response = "" + fib(n);
				} catch (Exception e) {
					System.out.println(" [.] " + e.toString());
					response = "";
				} finally {
					channel.basicPublish("", props.getReplyTo(), replyProps,
							response.getBytes("UTF-8"));

					channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
							false);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

先執行伺服器端,執行結果如下:

Java程式碼 複製程式碼 收藏程式碼
  1. [x] Awaiting RPC requests  
 [x] Awaiting RPC requests

   再執行執行客戶端,執行結果如下:

Java程式碼 複製程式碼 收藏程式碼
  1. [x] Requesting fib(30)  
  2. [.] Got '832040'
  3. [x] Requesting fib(-1)  
  4. [.] Got '-1'
  5. [x] Requesting fib(a)  
  6. [.] Got ''
 [x] Requesting fib(30)
 [.] Got '832040'
 [x] Requesting fib(-1)
 [.] Got '-1'
 [x] Requesting fib(a)
 [.] Got ''

   在伺服器還可以出現:

Java程式碼 複製程式碼 收藏程式碼
  1. [.] fib(30)  
  2. [.] fib(-1)  
  3. [.] java.lang.NumberFormatException: For input string: "a"

原文: