1. 程式人生 > >第九篇:Spring Boot整合RabbitMQ

第九篇:Spring Boot整合RabbitMQ

RabbitMQ是一個開源的訊息代理和佇列伺服器,用來通過普通協議在完全不同的應用之間共享資料,或者簡單地將作業佇列以便讓分散式伺服器進行處理。訊息佇列使用訊息將應用程式連線起來,這些訊息通過像RabbitMQ這樣的訊息代理伺服器在應用程式之間路由。這篇文章將帶你瞭解怎麼整合RabbitMQ伺服器,並且通過它怎麼去傳送和接收訊息。

構建工程

構架一個SpringBoot工程,其pom檔案依賴加上spring-boot-starter-amqp的起步依賴:

<dependency>
	<groupId>org.springframework.boot</
groupId
>
<artifactId>spring-boot-starter-amqp</artifactId> </dependency>

訊息交換機配置

@Configuration
public class ExchangeConfig {
    @Bean
    public DirectExchange directExchange(){
        DirectExchange directExchange = new DirectExchange(RabbitConfig.EXCHANGE,true,false);
        return
directExchange; } }

佇列配置

@Configuration
public class QueueConfig {

    @Bean
    public Queue Queue1() {
        return new Queue("first-queue",true,false,false);
    }

    @Bean
    public Queue Queue2() {
        return new Queue("second-queue",true,false,false);
    }
}

RabbitMq配置

@Configuration
public class RabbitMqConfig { /** 訊息交換機的名字*/ public static final String EXCHANGE = "exchangeTest"; /** 佇列key1*/ public static final String ROUTINGKEY1 = "queue_one_key1"; /** 佇列key2*/ public static final String ROUTINGKEY2 = "queue_one_key2"; @Autowired private QueueConfig queueConfig; @Autowired private ExchangeConfig exchangeConfig; /** * 連線工廠 */ @Autowired private ConnectionFactory connectionFactory; /** 將訊息佇列1和交換機進行繫結 */ @Bean public Binding binding_one() { return BindingBuilder.bind(queueConfig.Queue1()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1); } /** * 將訊息佇列2和交換機進行繫結 */ @Bean public Binding binding_two() { return BindingBuilder.bind(queueConfig.Queue2()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2); } /** * queue listener 觀察 監聽模式 * 當有訊息到達時會通知監聽在對應的佇列上的監聽物件 * @return */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){ SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); simpleMessageListenerContainer.addQueues(queueConfig.Queue1()); simpleMessageListenerContainer.setExposeListenerChannel(true); simpleMessageListenerContainer.setMaxConcurrentConsumers(5); simpleMessageListenerContainer.setConcurrentConsumers(1); simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認 return simpleMessageListenerContainer; } /** * 定義rabbit template用於資料的接收和傳送 * @return */ @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setConfirmCallback(msgSendConfirmCallBack()); return template; } /** * 訊息確認機制 * Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些訊息被broker處理, * 哪些可能因為broker宕掉或者網路失敗的情況而重新發布。 * 確認並且保證訊息被送達,提供了兩種方式:釋出確認和事務。(兩者不可同時使用) * 在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。 * @return */ @Bean public MsgSendConfirmCallBack msgSendConfirmCallBack() { return new MsgSendConfirmCallBack(); } }

訊息回撥

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info("MsgSendConfirmCallBack  , 回撥id:" + correlationData);
        if (ack) {
            logger.info("success");
        } else {
            logger.error("fail" + cause+"\n重新發送");
        }
    }
}

生產者

@Service
public class SendService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 傳送訊息
     * @param uuid
     * @param message  訊息
     */
    public void send(String uuid,Object message) {
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY2,
                message, correlationId);
    }

}

消費者

@Component
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitListener(queues = {"first-queue", "second-queue"},
            containerFactory = "rabbitListenerContainerFactory")
    public void handleMessage(String message) throws Exception {
        logger.info("Consumer handleMessage : " + message);
    }
}

Controller

@RestController
public class SendController {

    @Autowired
    private SendService sendService;

    @GetMapping("/send/{message}")
    public String send(@PathVariable String message){
        String uuid = UUID.randomUUID().toString();
        sendService.send(uuid,message);
        return uuid;
    }
}

測試

在這裡插入圖片描述
產看控制檯
在這裡插入圖片描述
恭喜!你剛才已經學會了如何通過spring raabitmq去構建一個訊息傳送和訂閱的程式。

原始碼下載:https://github.com/chenjary/SpringBoot