Spring Boot整合RabbitMQ
阿新 • • 發佈:2018-12-31
訂閱程式
- 原始碼
@Bean
public SimpleMessageListenerContainer customerMessageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(orderConnectionFactory());
container.setQueues(recoveryQueue());
container.setExposeListenerChannel(true);
container.setPrefetchCount (1);// 每次只消費1條
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 設定手動應答
container.setConcurrentConsumers(1); // Specify the number of concurrent consumers to create. Default is 1.
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
String bodyStr = new String(body);
try {
LOGGER.info("正在推送99單號回收彙總資訊");
dailyRecoveryGatherService.pushDailyRecoveryGather(bodyStr);
// 確認應答(是否批量.true:將一次性ack所有小於deliveryTag的訊息)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
LOGGER.info("99單號回收彙總推送完成,訊息內容:" + bodyStr);
} catch (Exception e) {
LOGGER.error("99單號回收彙總推送失敗,訊息內容:" + bodyStr, e);
/**
* 拒絕應答(true表示重新入佇列,
* false表示直接從佇列中刪除,此時和basicAck(long deliveryTag, false)的效果一樣)
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
});
return container;
}