1. 程式人生 > >Spring-rabbitmq 實現RPC 風格呼叫例項

Spring-rabbitmq 實現RPC 風格呼叫例項

1.  背景

專案中原來利用rabbitmq的RPC實現遠端方法呼叫,比較簡陋,封裝的比較差,而且topic等模式均為阻塞,如今我負責進行改造,今天重點看看如何利用spring-rabbitmq實現RPC風格的呼叫 簡單說來,RPC,主要目的是利用message實現遠端方法的呼叫

2.   關鍵程式碼

2.1  RPC  client

@Override
    public <T> void publishRPCMessage(String exchangeName, String topic, T t) throws NullExchangeNameException, NullTopicException, NullMessageException {

	    if(t==null)	{
			throw new NullMessageException("Null message");
		}
	    validateExchangeName(exchangeName);
	    validateTopic(topic);
	    try	{
		    MDC.put("exchange", exchangeName);
		    MDC.put("topic", topic);
			RabbitTemplate rabbitTempete=new RabbitTemplate(connectionFactory);
			rabbitTempete.setExchange(exchangeName);
			rabbitTempete.setRoutingKey(topic);
			Queue  replyqQueue=replyQueue();
			admin.declareQueue(replyqQueue);
			
			rabbitTempete.setReplyQueue(replyqQueue);
			//the reply listener
			SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
			container.setConnectionFactory(connectionFactory);
			// the queue to receive message
			container.setQueues(replyqQueue);
			container.setMessageListener(rabbitTempete);
			container.start();
			
			Object  response=rabbitTempete.convertSendAndReceive(t);
	        LOGGER.info("Added replyqueue listener");
        	LOGGER.debug("Published message for: " + t+"and the response is"+response);
	
		} catch(Exception ex) {
			LOGGER.error("Failed to publish message", ex);
		} finally {
		    MDC.remove("exchange");
		    MDC.remove("topic");
		}
		
	}
其中核心點是: 第一個宣告reply queue,第二監聽reply queue,這個與rabbitmq一樣,就是api不同而已。 最後利用
container.setMessageListener(rabbitTempete);

Object  response=rabbitTempete.convertSendAndReceive(t);
傳送message,並接受訊息。

2.2 RPC server

	public <T> void addRPCMessageHandler(String exchangeName, String topic,RPCMessageHandler<T> handler) throws NullExchangeNameException, NullTopicException, NullMessageException {
	    validateExchangeName(exchangeName);
	    validateTopic(topic);
	    try	{
		    MDC.put("exchange", exchangeName);
		    MDC.put("topic", topic);
		    validateExchangeName(exchangeName);
			SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
			container.setConnectionFactory(connectionFactory);
			Queue receiveQueue=getAutoGeneratedQueue(exchangeName,ExchangeType.TOPIC, topic);
			container.setQueues(receiveQueue);
			container.setMessageListener(new MessageListenerAdapter(handler));
	    	container.start();
		    
		    
	    } catch(Exception ex) {
			LOGGER.error("Failed to handle message", ex);
		} finally {
		    MDC.remove("exchange");
		    MDC.remove("topic");
		}
		
	}


RPC server的邏輯與普通的訊息處理邏輯一樣,沒有什麼額外邏輯。