1. 程式人生 > >7.RabbitMQ RFC同步調用

7.RabbitMQ RFC同步調用

port print exceptio 啟動 fault 客戶端 cti sina ()

RabbitMQ RFC同步調用是使用了兩個異步調用完成的,生產者調用消費者的同時,自己也作為消費者等待某一隊列的返回消息,消費者接受到生產者的消息同時,也作為消息發送者發送一消息給生產者。參考下圖:

技術分享圖片
調用流程如下: 技術分享圖片
其他的消息服務器實現同步調用也是類似的原理,比如ActiveMQ。 下面編寫消費者類Server 技術分享圖片
技術分享圖片
生產者Client代碼 技術分享圖片
技術分享圖片
啟動一命令行,將當前目錄轉移到項目所在的目錄 技術分享圖片
在Eclipse中運行Client 技術分享圖片
代碼: package com.test.rfc; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Server { public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.142"); //使用默認端口5672 Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); String queueName = "queue_rpc"; channel.queueDeclare(queueName, false, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("test1" + new String(body)); AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() .correlationId(properties.getCorrelationId()) .build(); String response = "hello client,I‘m rfc server"; channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queueName, false, consumer); } catch (IOException e) { e.printStackTrace(); } } } package com.test.rfc; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.rabbitmq.client.*; public class Client { public static void main(String[] argv) { try { //發送消息的隊列,Server在這個隊列上接受消息 String queueName = "queue_rpc"; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.142"); //使用默認端口5672 Connection connection = null; connection = factory.newConnection(); Channel channel = connection.createChannel(); //生成臨時的隊列,Client在這隊列上等待Server返回信息,Server向這個隊列發消息 String replyQueueName = channel.queueDeclare().getQueue(); //生成唯一ID final String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .correlationId(corrId).replyTo(replyQueueName).build(); //客戶端發送RFC請求 channel.basicPublish("", queueName, props, "GetUserInfo".getBytes()); //Server返回消息 final BlockingQueue response = new ArrayBlockingQueue(1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties.getCorrelationId()); if (properties.getCorrelationId().equals(corrId)) { response.offer(body); } } }); byte[] b = response.take(); System.out.println(new String(b)); channel.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } } }

7.RabbitMQ RFC同步調用