Spring Boot與RabbitMQ的整合訊息確認
訊息生產者和消費者
import com.rabbitmq.client.Channel;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.io.IOException; import java.util.Date; import java.util.UUID; /** * Created by yangliu on 2018/4/8. */ @Controller @RequestMapping("/rabbitMq") public class TestController { private Logger logger= LoggerFactory.getLogger(TestController.class); @Autowired RabbitAdmin rabbitAdmin; @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.getRabbitTemplate().setConfirmCallback(new MsgSendConfirmCallBack()); rabbitAdmin.getRabbitTemplate().setReturnCallback(new MsgSendReturnCallback()); return rabbitAdmin; } @RequestMapping("/sendMq") @ResponseBody public String send(String name) throws Exception { String context = "hello "+name+" --" + new Date(); String sendStr; for(int i=1;i<=100;i++){ sendStr="第["+i+"]個 hello --" + new Date(); logger.debug("HelloSender: " + sendStr); sendMessage("myqueue",sendStr); //Thread.sleep(1000); } return context; } /** * 方式一:動態宣告exchange和queue它們的繫結關係 rabbitAdmin * @param exchangeName * @param queueName */ protected void declareBinding(String exchangeName, String queueName) { if (rabbitAdmin.getQueueProperties(queueName) == null) { /* queue 佇列宣告 durable=true,交換機持久化,rabbitmq服務重啟交換機依然存在,保證不丟失; durable=false,相反 auto-delete=true:無消費者時,佇列自動刪除; auto-delete=false:無消費者時,佇列不會自動刪除 排他性,exclusive=true:首次申明的connection連線下可見; exclusive=false:所有connection連線下*/ Queue queue = new Queue(queueName, true, false, false, null); rabbitAdmin.declareQueue(queue); TopicExchange directExchange = new TopicExchange(exchangeName); rabbitAdmin.declareExchange(directExchange);//宣告exchange Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName); //將queue繫結到exchange rabbitAdmin.declareBinding(binding); //宣告繫結關係 rabbitAdmin.getRabbitTemplate().setMandatory(true); rabbitAdmin.getRabbitTemplate().setConfirmCallback(new MsgSendConfirmCallBack());//訊息確認 rabbitAdmin.getRabbitTemplate().setReturnCallback(new MsgSendReturnCallback());//確認後回撥 } else { rabbitAdmin.getRabbitTemplate().setQueue(queueName); rabbitAdmin.getRabbitTemplate().setExchange(queueName); rabbitAdmin.getRabbitTemplate().setRoutingKey(queueName); } } /** * 傳送訊息 * @param queueName * @param message * @throws Exception */ public void sendMessage(String queueName, String message) throws Exception { declareBinding(queueName, queueName); CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitAdmin.getRabbitTemplate().convertAndSend(queueName, queueName, message,correlationId); logger.debug("[rabbitmq-sendMessage]queueName:{} ,uuid:{},msg:{}",queueName,correlationId.getId(),message); } /** * 消費者 * @param connectionFactory * @return */ @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); Queue queue = new Queue("myqueue", true, false, false, null); container.setQueues(queue); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); try { //業務邏輯 logger.info("消費 receive msg : " + new String(body)); // 訊息的標識,false只確認當前一個訊息收到,true確認所有consumer獲得的訊息 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //手動確認確認訊息成功消費 } catch (Exception e) { logger.info("消費失敗: " + new String(body)); // ack返回false,並重新回到佇列,api裡面解釋得很清楚 try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (IOException e1) { e1.printStackTrace(); } } } }); return container; } /* //訊息的標識,false只確認當前一個訊息收到,true確認所有consumer獲得的訊息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //ack返回false,並重新回到佇列,api裡面解釋得很清楚 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //拒絕訊息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); 如果訊息沒有到exchange,則confirm回撥,ack=false 如果訊息到達exchange,則confirm回撥,ack=true exchange到queue成功,則不回撥return exchange到queue失敗,則回撥return(需設定mandatory=true,否則不回回調,訊息就丟了) */ }
失敗後return回撥:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("確認後回撥return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:" +replyText+",exchange:"+exchange+",routingKey:"+routingKey); } }
確認後回撥:
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("訊息確認成功cause"+cause); } else { //處理丟失的訊息 System.out.println("訊息確認失敗:"+correlationData.getId()+"#cause"+cause); } } }
相關推薦
Spring Boot與RabbitMQ的整合訊息確認
訊息生產者和消費者import com.rabbitmq.client.Channel;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.
Spring Boot 構建應用——整合訊息中介軟體 RabbitMQ
RabbitMQ 是訊息中介軟體的一種,實現了 AMQP 標準。訊息中介軟體的工作過程可以用生產者-消費者模型來表示。生產者發訊息到訊息佇列中去,消費者監聽指定的訊息佇列,並且當訊息佇列收到訊息之後,接收訊息佇列傳來的訊息,並且給予相應的處理。訊息佇列常用於分散
6、Spring Boot 與MyBatis整合
1.6 Spring Boot 與MyBatis整合 簡介 詳細介紹如何在Spring Boot中整合MyBatis,並通過註解方式實現對映。 完整原始碼: 1.6.1 建立 spring-boot-mybatis 專案 pom檔案如下 <?xml version="1
7、Spring Boot 與 Redis 整合
1.7 Spring Boot 與 Redis 整合 簡介 繼續上篇的MyBatis操作,詳細介紹在Spring Boot中使用RedisCacheManager作為快取管理器,整合業務於一體。 完整原始碼: Spring-Boot-Demos 1.7.1 建立 spring-boot-r
Spring Boot與React整合
前言 前不久學習了Web開發,用React寫了前端,Spring Boot搭建了後端,然而沒有成功地把兩個工程結合起來,造成前端與後端之間需要跨域通訊,帶來了一些額外的工作。 這一次成功地將前端工程與後端結合在一個Project中,記錄一下,也希望能幫到那些和我一樣的入門小白。 環境 Win
Spring Boot與Quartz整合
1.匯入依賴包 <!-- quartz定時器 --> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artif
spring boot與junit整合測試
先建立一個REST介面 package com.laoxu.gamedog.controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframew
spring boot 與 mybatis整合
本專案使用的環境: 開發工具:Intellij IDEA 2018.1.3 springboot: 2.0.5.RELEASE jdk:1.8.0_161 maven:3.5.3 首先先建立springboot專案 選擇需要的模組 填寫包名 1.開始在專案
Activiti學習之spring boot 與activiti整合
1、新建spring boot 專案 使用IDEA新建一個spring boot 專案,專案結構如下: 2、增加activiti、jpa、mysql等依賴 <dependencies> <dependency&
如何使用Spring Boot與RabbitMQ結合實現延遲佇列
背景 何為延遲佇列? 顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。 延遲佇列能做什麼?延遲佇列多用於需要延遲工作的場景。最常見的是以下兩種場景:延遲消費。比如: 使用者生成訂單之後
Spring Boot與Log4j2整合之java.lang.IllegalStateException: Logback configuration error detected:
引言: 一個問題的分析與解決過程是表與裡的過程,是一個大膽猜測與小心求證的過程,spring boot與log4j2的整合過程中,我將描述一下分析這個問題的思路和過程。 我一直強調一點: 重要的不是解決問題的結論,而是解決問題的思路和方法,即使在解決完問題之後,
spring boot 與 Mybatis整合(*)
業務層 tomcat ng- quest map big selectall esp 連接 在pom.xml文件中加入數據庫、spring-mybatis整合 <!-- spring boot 整合mybatis --> <de
spring與RabbitMQ整合 消費者消費不到訊息 重啟才能消費到的問題解決
RabbitMQ是當前一個挺火的訊息佇列中介軟體 相比ActiveMQ 訊息更不容易丟失 我之前用的是ActiveMQ 後邊有的時候會莫名其妙的收不到訊息 專案緊後邊也沒時間排查 經朋友的推薦下 換了RabbitMQ 後邊用著也沒啥問題 今天 的Rabbit
Spring Boot與訊息 JMS、AMQP、RabbitMQ簡單概述
一、概述 1. 大多應用中,可通過訊息服務中介軟體來提升系統非同步通訊、擴充套件解耦能力 2. 訊息服務中兩個重要概念: 訊息代理(message broker)和目的地(destination) 當訊息傳送者傳送訊息以後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目
spring boot與jdbcTemplate的整合案例2
database bean n) ret struct mapping rri ott mode 簡單入門了spring boot後,接下來寫寫跟數據庫打交道的案例。博文采用spring的jdbcTemplate工具類與數據庫打交道。 下面是搭建的springbo
spring boot 與 shiro的簡單整合使用
scheduler div turn map 用戶 ttr algorithm pen enc shrio官網:https://shiro.apache.org/ Apache Shiro是一個功能強大且易於使用的Java安全框架,可執行身份驗證,授權,加密和會話管理。借助
022 spring與Rabbitmq整合
ring config resource 進行 virt vat gte urn address 一 .概述 本次我們使用spring幫助我們完成Rabbitmq的使用. 二 .環境的搭建 本次使用springboot的jar文件幫助整合rabbitmq,但是
Apache Shiro(三)——Spring Boot 與 Shiro的 整合
在瞭解了Apache Shiro的架構、認證、授權之後,我們來看一下Shiro與Web的整合。下面以Spring Boot為例,介紹一下Spring Boot 與 Shiro的 整合。 一、建立一個Spring Boot專案 可以使用IDEA快速建立一個Spring Boot專
Spring boot與Redis的整合使用
關於Redis的安裝與叢集部署,可以參考《Linux下Redis的叢集部署》 一、Redis的單機使用 (1) 新建gradle專案,依賴如下: dependencies { compile 'org.springframework.boot:spring-boot-starte
Spring Boot整理——spring boot 與常用元件整合
一、專案搭建 首先建立的Spring Boot工程(war包),然後匯入相關依賴: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/200