1. 程式人生 > >Spring Boot整合RabbitMQ

Spring Boot整合RabbitMQ

訂閱程式

  • 原始碼
@Bean
public SimpleMessageListenerContainer customerMessageContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(orderConnectionFactory());
    container.setQueues(recoveryQueue());
    container.setExposeListenerChannel(true);
    container.setPrefetchCount
(1);// 每次只消費1條 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 設定手動應答 container.setConcurrentConsumers(1); // Specify the number of concurrent consumers to create. Default is 1. container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message,
Channel channel) throws Exception { byte[] body = message.getBody(); String bodyStr = new String(body); try { LOGGER.info("正在推送99單號回收彙總資訊"); dailyRecoveryGatherService.pushDailyRecoveryGather(bodyStr); // 確認應答(是否批量.true:將一次性ack所有小於deliveryTag的訊息)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); LOGGER.info("99單號回收彙總推送完成,訊息內容:" + bodyStr); } catch (Exception e) { LOGGER.error("99單號回收彙總推送失敗,訊息內容:" + bodyStr, e); /** * 拒絕應答(true表示重新入佇列, * false表示直接從佇列中刪除,此時和basicAck(long deliveryTag, false)的效果一樣) */ channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } } }); return container; }