Spring-rabbitmq 實現RPC 風格呼叫例項
阿新 • • 發佈:2019-02-02
1. 背景
專案中原來利用rabbitmq的RPC實現遠端方法呼叫,比較簡陋,封裝的比較差,而且topic等模式均為阻塞,如今我負責進行改造,今天重點看看如何利用spring-rabbitmq實現RPC風格的呼叫 簡單說來,RPC,主要目的是利用message實現遠端方法的呼叫2. 關鍵程式碼
2.1 RPC client
其中核心點是: 第一個宣告reply queue,第二監聽reply queue,這個與rabbitmq一樣,就是api不同而已。 最後利用@Override public <T> void publishRPCMessage(String exchangeName, String topic, T t) throws NullExchangeNameException, NullTopicException, NullMessageException { if(t==null) { throw new NullMessageException("Null message"); } validateExchangeName(exchangeName); validateTopic(topic); try { MDC.put("exchange", exchangeName); MDC.put("topic", topic); RabbitTemplate rabbitTempete=new RabbitTemplate(connectionFactory); rabbitTempete.setExchange(exchangeName); rabbitTempete.setRoutingKey(topic); Queue replyqQueue=replyQueue(); admin.declareQueue(replyqQueue); rabbitTempete.setReplyQueue(replyqQueue); //the reply listener SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // the queue to receive message container.setQueues(replyqQueue); container.setMessageListener(rabbitTempete); container.start(); Object response=rabbitTempete.convertSendAndReceive(t); LOGGER.info("Added replyqueue listener"); LOGGER.debug("Published message for: " + t+"and the response is"+response); } catch(Exception ex) { LOGGER.error("Failed to publish message", ex); } finally { MDC.remove("exchange"); MDC.remove("topic"); } }
container.setMessageListener(rabbitTempete);
Object response=rabbitTempete.convertSendAndReceive(t);
傳送message,並接受訊息。
2.2 RPC server
public <T> void addRPCMessageHandler(String exchangeName, String topic,RPCMessageHandler<T> handler) throws NullExchangeNameException, NullTopicException, NullMessageException { validateExchangeName(exchangeName); validateTopic(topic); try { MDC.put("exchange", exchangeName); MDC.put("topic", topic); validateExchangeName(exchangeName); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); Queue receiveQueue=getAutoGeneratedQueue(exchangeName,ExchangeType.TOPIC, topic); container.setQueues(receiveQueue); container.setMessageListener(new MessageListenerAdapter(handler)); container.start(); } catch(Exception ex) { LOGGER.error("Failed to handle message", ex); } finally { MDC.remove("exchange"); MDC.remove("topic"); } }
RPC server的邏輯與普通的訊息處理邏輯一樣,沒有什麼額外邏輯。