RabbitMQ RPC
阿新 • • 發佈:2019-02-05
在《rabbitmq學習2:Work Queues 》中我們已經知道了在多個worker如何分配耗時的任務。如果我現在要在遠端的機器上執行然後得到結果,那應當怎麼做呢?那就要用到RPC(Remote Procedure Call or
RPC )了!
現在來看看來看看Rabbitmq中RPC吧!RPC的工作示意圖如下:
上圖中的C代表客戶端,S表示伺服器端;Rabbitmq中的RPC流程如下:
1、首先客戶端傳送一個reply_to和corrention_id的請求,釋出到RPC佇列中;
2、伺服器端處理這個請求,並把處理結果釋出到一個回撥Queue,此Queue的名稱應當與reply_to的名稱一致
3、客戶端從回撥Queue中得到先前corrention_id設定的值的處理結果。如果碰到和先前不一樣的corrention_id的值,將會忽略而不是丟擲異常。
對於上面所提到的回撥Queue中的消費處理使用的是BasicProperties類;而訊息 屬性在AMQP的協議中規定有14個;而很多大部分我們沒有用到。常用的幾個屬性有:
English程式碼- Message properties
- The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
- delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
- content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
- reply_to: Commonly used to name a callback queue.
- correlation_id: Useful to correlate RPC responses with requests.
delivery_mode : 標記訊息是永續性訊息還是瞬態資訊。在前面的“Work Queue”中我們已經提到過;
content_type : 用來描述MIME的型別。如把其型別設定為JSON;
reply_to : 用於命名一個回撥Queue;
correlation_id : 用於與相關聯的請求的RPC響應.
現在我們就開始RPC的程式吧!
client的程式碼如下:
Java程式碼- package com.abin.rabbitmq;
- import java.util.UUID;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- publicclass RPCClient {
- private Connection connection;
- private Channel channel;
- private String requestQueueName = "rpc_queue";
- private String replyQueueName;
- private QueueingConsumer consumer;
- public RPCClient() throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- connection = factory.newConnection();
- channel = connection.createChannel();
- replyQueueName = channel.queueDeclare().getQueue();
- consumer = new QueueingConsumer(channel);
- channel.basicConsume(replyQueueName, true, consumer);
- }
- public String call(String message) throws Exception {
- String response = null;
- String corrId = UUID.randomUUID().toString();
- BasicProperties props = new BasicProperties();
- props.setReplyTo(replyQueueName);
- props.setCorrelationId(corrId);
- channel.basicPublish("", requestQueueName, props, message.getBytes());
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- if (delivery.getProperties().getCorrelationId().equals(corrId)) {
- response = new String(delivery.getBody(), "UTF-8");
- break;
- }
- }
- return response;
- }
- publicvoid close() throws Exception {
- connection.close();
- }
- publicstaticvoid main(String[] argv) {
- RPCClient fibonacciRpc = null;
- String response = null;
- try {
- fibonacciRpc = new RPCClient();
- System.out.println(" [x] Requesting fib(30)");
- response = fibonacciRpc.call("30");
- System.out.println(" [.] Got '" + response + "'");
- System.out.println(" [x] Requesting fib(-1)");
- response = fibonacciRpc.call("-1");
- System.out.println(" [.] Got '" + response + "'");
- System.out.println(" [x] Requesting fib(a)");
- response = fibonacciRpc.call("a");
- System.out.println(" [.] Got '" + response + "'");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (fibonacciRpc != null) {
- try {
- fibonacciRpc.close();
- } catch (Exception ignore) {
- }
- }
- }
- }
- }
package com.abin.rabbitmq;
import java.util.UUID;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties();
props.setReplyTo(replyQueueName);
props.setCorrelationId(corrId);
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting fib(-1)");
response = fibonacciRpc.call("-1");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting fib(a)");
response = fibonacciRpc.call("a");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}
server的程式碼如下:
Java程式碼- package com.abin.rabbitmq;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- publicclass RPCServer {
- privatestaticfinal String RPC_QUEUE_NAME = "rpc_queue";
- privatestaticint fib(int n) {
- if (n > 1)
- return fib(n - 1) + fib(n - 2);
- else
- return n;
- }
- publicstaticvoid main(String[] argv) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- connection = factory.newConnection();
- channel = connection.createChannel();
- channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
- channel.basicQos(1);
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
- System.out.println(" [x] Awaiting RPC requests");
- while (true) {
- String response = null;
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- BasicProperties props = delivery.getProperties();
- BasicProperties replyProps = new BasicProperties();
- replyProps.setCorrelationId(props.getCorrelationId());
- try {
- String message = new String(delivery.getBody(), "UTF-8");
- int n = Integer.parseInt(message);
- System.out.println(" [.] fib(" + message + ")");
- response = "" + fib(n);
- } catch (Exception e) {
- System.out.println(" [.] " + e.toString());
- response = "";
- } finally {
- channel.basicPublish("", props.getReplyTo(), replyProps,
- response.getBytes("UTF-8"));
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
- false);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception ignore) {
- }
- }
- }
- }
- }
package com.abin.rabbitmq;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n > 1)
return fib(n - 1) + fib(n - 2);
else
return n;
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties();
replyProps.setCorrelationId(props.getCorrelationId());
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", props.getReplyTo(), replyProps,
response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
先執行伺服器端,執行結果如下:
Java程式碼- [x] Awaiting RPC requests
[x] Awaiting RPC requests
再執行執行客戶端,執行結果如下:
Java程式碼- [x] Requesting fib(30)
- [.] Got '832040'
- [x] Requesting fib(-1)
- [.] Got '-1'
- [x] Requesting fib(a)
- [.] Got ''
[x] Requesting fib(30)
[.] Got '832040'
[x] Requesting fib(-1)
[.] Got '-1'
[x] Requesting fib(a)
[.] Got ''
在伺服器還可以出現:
Java程式碼- [.] fib(30)
- [.] fib(-1)
- [.] java.lang.NumberFormatException: For input string: "a"
原文: