RabbitMQ學習筆記(四)-----------------RPC
阿新 • • 發佈:2018-11-24
專案地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/RPC_RabbitMQ
RPC遠端服務呼叫,舉個例子就是客戶端遠端呼叫服務端的方法幫自己運算,並把結果返回
流程圖:
專案結構
Client
- 建立一個反饋佇列,這個佇列的作用是等待伺服器返回處理結果
- 傳送請求,請求的內容包含,待處理的資訊,correlaitonId(用來檢查反饋內容是否是自己需要 的),反饋佇列的名字
- 從反饋佇列中取出資訊,對資訊進行驗證(correlationId),驗證通過,返回結果
package com.ggp; import com.rabbitmq.client.*; import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; /** * @ClassName Client * @Description TODO * @Author Mr.G * @Date 2018/11/21 16:39 * @Version 1.0 */ public class Client { private Connection connection; private Channel channel; /** * 請求佇列名 */ private String requestQueueName = "my_queue"; /** * 等待響應佇列名 */ private String replyQueueName; private QueueingConsumer consumer; public Client() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connection = connectionFactory.newConnection(); channel = connection.createChannel(); /** * 監聽反饋佇列的狀態 */ replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName,true,consumer); } public String call(String message) throws IOException, InterruptedException{ String result; String corrID = UUID.randomUUID().toString(); /** * 建造者模式,建立配置檔案,把corrID和需要反饋資訊的佇列名放進去 */ AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(corrID).replyTo(replyQueueName).build(); channel.basicPublish("",requestQueueName,properties,message.getBytes()); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if(delivery.getProperties().getCorrelationId().equals(corrID)){ result = new String(delivery.getBody(), "UTF-8"); break; } } return result; } public void close() throws Exception{ connection.close(); } public static void main(String[] args)throws Exception{ Client client = new Client(); String result = client.call("20"); System.out.println(result); client.close(); } }
Server
- 接收客戶端資訊,處理內容,計算結果
- 通過接受的資訊,得到反饋佇列的名字,給該佇列傳送資訊,資訊內容包括correlationId和處理結果
package com.ggp; import com.rabbitmq.client.*; import java.io.IOException; import java.net.ConnectException; import java.util.concurrent.TimeoutException; /** * @ClassName Server * @Description TODO * @Author Mr.G * @Date 2018/11/21 17:10 * @Version 1.0 */ public class Server { private static final String RPC_QUEUE_NAME = "my_queue"; private static int fib(int n){ if(n == 0){ return 0; } if(n == 1){ return 1; } return fib(n-1) + fib(n -1); } public Server() throws IOException, TimeoutException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME,false,false, false,null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME,false,consumer); System.out.println("Server is waiting request"); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(delivery.getProperties().getCorrelationId()).build(); String message = new String(delivery.getBody(),"UTF-8"); int n = Integer.parseInt(message); System.out.println("receive the message : "+n); String response = ""+fib(n); channel.basicPublish("",delivery.getProperties().getReplyTo(),properties,response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
測試方法
package com.ggp.test;
import com.ggp.Client;
/**
* @ClassName ClintTest
* @Description TODO
* @Author Mr.G
* @Date 2018/11/22 8:41
* @Version 1.0
*/
public class ClintTest {
public static void main(String[] args)throws Exception{
Client client = new Client();
String result = client.call("20");
System.out.println(result);
}
}
package com.ggp.test;
import com.ggp.Client;
import com.ggp.Server;
/**
* @ClassName Test
* @Description TODO
* @Author Mr.G
* @Date 2018/11/22 8:39
* @Version 1.0
*/
public class ServerTest {
public static void main(String[] args)throws Exception{
new Server();
}
}
測試結果