RabbitMQ-RPC
前篇文章,講了如何使用工作佇列在多個消費者之間分配耗時的任務.如果我們需要在遠端計算機上執行一個方法並等待結果,這個模式通常叫rpc
Client interface(客戶端介面)
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
Callback queue(回撥佇列)
通常在RabbitMQ上做RPC很容易。 客戶端傳送請求訊息,伺服器回覆響應訊息。 為了接收響應,我們需要在請求訊息中傳送一個回撥佇列。 我們可以使用預設佇列(僅限Java客戶端)。
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
訊息屬性
AMQP協議預定義了一組帶有訊息的14個屬性。 大多數屬性很少使用,但以下情況除外:
* deliveryMode:訊息傳遞模式,2.訊息持久化,其他值-瞬態.
* contentType:用於描述編碼的mime型別。 例如,對於經常使用的JSON編碼,將此屬性設定為:application / json是一種很好的做法。
* replyTo:回撥佇列名字
* correlationId:用於將RPC響應與請求相關聯。
Correlation Id
如果為每個rpc請求,都建立一個回撥佇列,會很低效,所以可以採用: 為每個客戶端建立一個單一的回撥佇列.
我們要為每個請求設定唯一的值,在回撥佇列中獲取訊息,correlation id, 關聯response和request就是基於這個屬性值的.如果看到一個未知的correlationid屬性值的訊息,拋棄它
您可能會問,為什麼我們應該忽略回撥佇列中的未知訊息,而不是失敗並出現錯誤?這是由於伺服器端可能存在競爭條件。雖然不太可能,但RPC伺服器可能會在向我們傳送結果後,傳送請求的確認訊息之前宕調了,這就有可能傳送未知的correlationid屬性值了。如果發生這種情況,重新啟動的RPC伺服器將再次處理請求。這就是為什麼在客戶端上我們必須優雅地處理重複的響應,理想情況下RPC應該是冪等的。
Summary
流程
* 客戶端啟動時,會建立一個匿名的獨佔回撥佇列。
* 對於RPC請求,客戶端傳送帶有兩個屬性的訊息:replyTo(設定為回撥佇列)和correlationId(設定為每個請求的唯一值)。
* 請求被髮送到rpc_queue佇列。
* RPC 伺服器監聽rpc_queue佇列中請求. 當出現請求時,它會執行該作業,接收佇列就是replyTo設定的回撥佇列。
* 客戶端監聽回撥佇列。 出現訊息時,它會檢查correlationId屬性。 如果它與請求中的值匹配,則返回對應用程式的響應。
(客戶端,我有一堆東西,你處理一下,correlationid,我的請求標識,replyto.處理完結果返回這個佇列中
服務端:處理完了,返回,correlationid,客戶端拿它的id和服務端返回的id對比,然後進行接收)
程式碼
package com.tgb.kwy.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
* Description
*
* @author kongwy [email protected]
* @version 1.0
* @date 2018-07-11-09 -27
*/
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.159.132");/*設定rabbitmq所在主機ip或主機名*/
/*指定使用者名稱和密碼*/
factory.setUsername("admin");
factory.setPassword("admin");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
}
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", replyQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException{
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
}
catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
finally {
if (fibonacciRpc!= null) {
try {
fibonacciRpc.close();
}
catch (IOException _ignore) {}
}
}
}
}
package com.tgb.kwy.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Description
*
* @author kongwy [email protected]
* @version 1.0
* @date 2018-07-11-09 -10
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME="rpc_queue";
private static int fib(int n) {
if (n ==0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.159.132");/*設定rabbitmq所在主機ip或主機名*/
/*指定使用者名稱和密碼*/
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection=null;
try {
connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
String response = "";
try {
String message = new String(body,"UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
}
catch (RuntimeException e){
System.out.println(" [.] " + e.toString());
}
finally {
channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized(this) {
this.notify();
}
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized(consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
finally {
if (connection != null)
try {
connection.close();
} catch (IOException _ignore) {}
}
}
}
執行結果
注:程式碼來自官網.官網對於概念性的內容,講解的還是很清楚的