1. 程式人生 > >Spring Boot AMQP+RabbitMq

Spring Boot AMQP+RabbitMq

主要解決兩個問題:

1如何傳送/釋出訊息(將java物件序列化為json),通過使用RabbitTemplate

2如何讀取消費訊息(json反序列化為java物件)使用@RabbitListener註解

amqp是一個規範,一種定義了兩個系統間通過訊息進行互動的協議,而RabbitMQ則是實現了該規範的一個強大的中介軟體產物。

我們通過設定兩個監聽佇列分別演示通用訊息的處理及自定義訊息類的處理。

Queues and process view

 

maven依賴:  

1

2

3

4

<dependency>

   <groupId>org.springframework.amqp</groupId>

   <artifactId>spring-rabbit</artifactId>

</dependency> 

javaBean:CustomMessage

public class CustomMessage implements Serializable{

    private String text;
    private int priority;
    private boolean secret;

    // Default constructor is needed to deserialize JSON
    public CustomMessage() {
    }

    public CustomMessage(String text, int priority, boolean secret) {
        this.text = text;
        this.priority = priority;
        this.secret = secret;
    }

    public String getText() {
        return text;
    }

    public int getPriority() {
        return priority;
    }

    public boolean isSecret() {
        return secret;
    }

    @Override
    public String toString() {
        return "CustomMessage{" +
                "text='" + text + '\'' +
                ", priority=" + priority +
                ", secret=" + secret +
                '}';
    }
}

訊息生產者:

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

@Component

public class CustomMessageSender {

 

    private static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class);

 

    private final RabbitTemplate rabbitTemplate;

 

    @Autowired

    public CustomMessageSender(final RabbitTemplate rabbitTemplate) {

        this.rabbitTemplate = rabbitTemplate;

    }

 

    @Scheduled(fixedDelay = 3000L)

    public void sendMessage() {

        final CustomMessage message = new CustomMessage("Hello there!", new Random().nextInt(50), false);

        log.info("Sending message...");

        rabbitTemplate.convertAndSend(MessagingApplication.EXCHANGE_NAME, MessagingApplication.ROUTING_KEY,         message);

    }

}

   

 訊息消費者:

@Component

public class CustomMessageListener {

    private static final Logger log = LoggerFactory.getLogger(CustomMessageListener.class);

    @RabbitListener(queues = MessagingApplication.QUEUE_GENERIC_NAME)

    public void receiveMessage(final Message message) {

        log.info("Received message as generic: {}", message.toString());

    }

    @RabbitListener(queues = MessagingApplication.QUEUE_SPECIFIC_NAME)

    public void receiveMessage(final CustomMessage customMessage) {

        log.info("Received message as specific class: {}", customMessage.toString());

    }

}

SpringBoot主類:

@SpringBootApplication
@EnableRabbit
@EnableScheduling
public class MessagingApplication implements RabbitListenerConfigurer {

@Bean
public TopicExchange appExchange() {
    return new TopicExchange(EXCHANGE_NAME);
}

@Bean
public Queue appQueueGeneric() {
    return new Queue(QUEUE_GENERIC_NAME);
}

@Bean
public Queue appQueueSpecific() {
    return new Queue(QUEUE_SPECIFIC_NAME);
}

@Bean
public Binding declareBindingGeneric() {
    return BindingBuilder.bind(appQueueGeneric()).to(appExchange()).with(ROUTING_KEY);
}

@Bean
public Binding declareBindingSpecific() {
    return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY);
}

@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
    return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
   return new MappingJackson2MessageConverter();
}

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
   DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
   factory.setMessageConverter(consumerJackson2MessageConverter());
   return factory;
}

@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
   registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}

@[email protected] 用於啟用rabbit相關的註解和定時任務相關注解,當前設定的3000L單位為毫秒。