1. 程式人生 > >rabbitmq 整合spring boot

rabbitmq 整合spring boot

引入包

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置檔案 application.properties

spring.rabbitmq.port=5672
spring.rabbitmq.virtualHost=/
spring.rabbitmq.host=192.168
.74.164 spring.rabbitmq.username=guest spring.rabbitmq.password=guest

配置類


import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework
.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection
.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; @Configuration public class RabbitMqConfig { public static final String GET_TICKET = "test"; @Bean public Queue getTicket() { return new Queue(GET_TICKET, true); // 佇列持久 } @Bean public TopicExchange topicExchange(){ return new TopicExchange("ticket_default_exchange"); } @Bean Binding bindingExchangeMessage(Queue getTicket, TopicExchange topicExchange) { return BindingBuilder.bind(getTicket).to(topicExchange).with("#"); } /** * Rabbit監聽容器工廠 * @param configurer * @param connectionFactory * @return */ @Bean("ticketRabbitmqContainerFactory") public SimpleRabbitListenerContainerFactory rabbitmqContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setPrefetchCount(1);//一次去幾個訊息 factory.setConcurrentConsumers(1);//幾個消費者 // 執行緒的執行緒池 ExecutorService executorService = Executors.newFixedThreadPool(1);//消費者執行緒池(只能大於等於消費者,最好等於) factory.setTaskExecutor(executorService); configurer.configure(factory, connectionFactory); return factory; } }

消費者



import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import com.hhly.ticket.service.ticket.rabbitmq.RabbitMqConfig;

@Component
public class TicketListenter implements MessageListener {

    @RabbitListener(queues=RabbitMqConfig.GET_TICKET, containerFactory="rabbitmqContainerFactory")
    public void onMessage(Message message) {
        String result  = new String(message.getBody(), "UTF-8");
    }

}