RabbitMQ中的訊息回撥機制
阿新 • • 發佈:2018-10-31
最近在專案中需要用到RabbitMQ的訊息分發機制,Client端在傳送訊息給Server端處理之後還需要等待Server端的處理結果,開始很是困惑Server端如何將處理完成之後的結果再返回給相應傳送這個訊息的Client端,直到翻閱官方的資料才發現,Client端在傳送訊息的同時可以一併在 BasicProperties中將回調地址與collectionId一起傳送到Server端,Server端在處理完成之後就可以通過這兩個引數來將處理結果相應的再返回給傳送此訊息的Client端。
Client端與Server的端具體實現如下。
Client端:
package cn.muyi .RabbitMQ;
import java.io.IOException;
import java.util.UUID;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq .client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCClient {
private static String requestQueueName = "rpc_queue";
private static String replyQueueName;
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
// 建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的服務端Host");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//若此處不宣告佇列,則訊息傳送者先執行而佇列未宣告時訊息會丟失。
//channel.queueDeclare(requestQueueName, false, false, false, null);
replyQueueName = channel.queueDeclare().getQueue();
String message = "Welcome MuYi";
String collectionId = UUID.randomUUID().toString();
//存入回撥佇列名與collectionId
BasicProperties bpro = new BasicProperties().builder().correlationId(collectionId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, bpro, message.getBytes());
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消費佇列 關閉ack機制
channel.basicConsume(replyQueueName, true, consumer);
String responseMsg = null;
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if(collectionId.equals(delivery.getProperties().getCorrelationId())){
responseMsg = new String(delivery.getBody());
break;
}
}
System.out.println("Received "+responseMsg);
}
}
Server端:
package cn.muyi.RabbitMQ;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class RpcReceiver {
private static String requestQueueName = "rpc_queue";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
// 建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的服務端Host");
Connection connection = factory.newConnection();
//宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
Channel channel = connection.createChannel();
channel.queueDeclare(requestQueueName, false, false, false, null);
//建立佇列消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消費佇列 開啟ack機制
channel.basicConsume(requestQueueName, false,consumer);
while(true){
//nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received Client Message:"+message);
//獲取回撥佇列名與Correlation Id
BasicProperties bpro = delivery.getProperties();
String replName = bpro.getReplyTo();
BasicProperties replBP = new BasicProperties().builder().correlationId(bpro.getCorrelationId()).build();
String responseMsg = "Just Do It";
channel.basicPublish("", replName, replBP, responseMsg.getBytes());
//傳送應答,通過delivery.getEnvelope().getDeliveryTag()獲取此次確認的訊息的序列號
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
注:BasicProperties 中常用引數說明
Message properties
The AMQP protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
replyTo: Commonly used to name a callback queue.
correlationId: Useful to correlate RPC responses with requests