rabbitmq實現RPC例項
阿新 • • 發佈:2019-02-12
最近閱讀《RabbitMq實戰指南》瞭解了rpc(remote procedure call 遠端過程呼叫)的實現。下面是測試的例子:
服務端
/** * <p> * * rpc伺服器,1、開啟佇列,2、消費訊息,3、把response傳送到回撥佇列。 * * </p> * @author hz16092620 * @date 2018年9月16日 上午10:08:23 * @version */ public class RpcServer { public static void main(String[] args) { consumerMessage(); } /** * 服務端消費訊息 * */ public static void consumerMessage() { Connection conn = RabbitConnection.createConnection();//自定義的獲取連結的方法 try { String queneName = "rpc_liuhp_quene"; final Channel channel = conn.createChannel(); channel.queueDeclare(queneName, false, true, false, null); //消費訊息,推模式 channel.basicQos(10);//最多消費訊息個數 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { StringBuilder sb = new StringBuilder(); for (byte b : body) { sb.append((char) b); } System.out.println(sb.toString()); BasicProperties props = new BasicProperties().builder().correlationId(properties.getCorrelationId()).build(); channel.basicPublish("", properties.getReplyTo(), props, "result".getBytes());//消費訊息之後返回result } }; channel.basicConsume(queneName, true, consumer); } catch (IOException e) { e.printStackTrace(); } finally { //連線不關閉,一直處於開啟狀態 } } }
客戶端:
/** * <p> * * 客戶端傳送訊息,然後接收訊息, * * </p> * @author hz16092620 * @date 2018年9月16日 上午10:27:08 * @version */ public class RpcClient { public static void main(String[] args) { createClient(); } /** * 交換器傳送資料 * */ public static void createClient() { Connection conn = RabbitConnection.createConnection(); Channel channel = null; try { channel = conn.createChannel(); String queneName = channel.queueDeclare().getQueue(); final String uuid = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties().builder().correlationId(uuid).replyTo(queneName).build(); channel.basicPublish("", "rpc_liuhp_quene", props, String.valueOf(new Random().nextInt(100)).getBytes());// 傳送訊息 // 接受返回的結果 // 方式一 queneingConsumer 很多的問題,已經在3.0之後遺棄了。這個暫時不研究 // QueueingConsumer consume = new QueueingConsumer(channel); /* * while (true) { QueueingConsumer.Delivery delivery = * consume.nextDelivery(); if * (delivery.getProperties().getCorrelationId().equals(uuid)) { * System.out.println(new String(delivery.getBody())); } break; } */ // 方式二 defaultConsumer Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { while (true) { if (properties.getCorrelationId().equals(uuid)) { StringBuilder sb = new StringBuilder(); for (byte b : body) { sb.append((char) b); } System.out.println(sb.toString()); } break; } } }; channel.basicConsume(queneName, true, consumer); Thread.sleep(30000L); } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }