1. 程式人生 > >rabbitmq結合springboot實現ACK機制的消費機制

rabbitmq結合springboot實現ACK機制的消費機制

###1.RabbitMQ介紹

RabbitMQ是實現AMQP(高階訊息佇列協議)的訊息中介軟體的一種,最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。RabbitMQ主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生資料時,消費者無法快速消費,那麼需要一箇中間層。儲存這個資料。

AMQP,即Advanced Message Queuing Protocol,高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。AMQP的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。

RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。

###2.AmqpTemplate,RabbitTemplate
Spring AMQP提供了一個傳送和接收訊息的操作模板類AmqpTemplate。 AmqpTemplate它定義包含了傳送和接收訊息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一個實現。

RabbitTemplate支援訊息的確認與返回,為了返回訊息,RabbitTemplate 需要設定mandatory 屬性為true,並且CachingConnectionFactory 的publisherReturns屬性也需要設定為true。返回的訊息會根據它註冊的RabbitTemplate.ReturnCallback setReturnCallback 回調發送到給客戶端,

一個RabbitTemplate僅能支援一個ReturnCallback 。

為了確認Confirms訊息, CachingConnectionFactory 的publisherConfirms 屬性也需要設定為true,確認的訊息會根據它註冊的RabbitTemplate.ConfirmCallback setConfirmCallback回調發送到給客戶端。一個RabbitTemplate也僅能支援一個ConfirmCallback.

###3.SpringBoot整合RabbitMQ

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
 
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
 
	<name>rabbitMQ</name>
	<description>Demo project for Spring Boot</description>
 
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.2.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
 
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>
 
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
 
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
 
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
 
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
 
 
</project>

自動配置資訊 這裡我開啟ACK訊息確認

server.port=8083
#伺服器配置
spring.application.name=rabbitmq-hello-sending
#rabbitmq連線引數
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=linpeng
spring.rabbitmq.password=123456
# 開啟發送確認
spring.rabbitmq.publisher-confirms=true
# 開啟發送失敗退回
spring.rabbitmq.publisher-returns=true
# 開啟ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

建立訊息佇列 佇列名:hello 和 helloObj

package com.example.demo;
 
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitConfig {
 
    @Bean
    public Queue QueueA() {
        return new Queue("hello");
    }
 
    @Bean
    public Queue QueueB() {
        return new Queue("helloObj");
    }
 
    /**
     * Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送訊息,綁定了這個交換機的所有佇列都收到這個訊息。
     * @return
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("ABExchange");
    }
 
 
    @Bean
    Binding bindingExchangeA(Queue QueueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(QueueA).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeB(Queue QueueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(QueueB).to(fanoutExchange);
    }
}

訊息傳送者 Sender 使用 RabbitTemplate 不採用 AmqpTemplate

package com.example.demo;
 
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
 
import java.util.Date;
//RabbitTemplate.ConfirmCallback
@Service
public class HelloSender implements RabbitTemplate.ReturnCallback {
 
    @Autowired
    //private AmqpTemplate rabbitTemplate;
    private RabbitTemplate rabbitTemplate;
    public void send() {
        String context = "你好現在是 " + new Date() +"";
        System.out.println("HelloSender傳送內容 : " + context);
         // this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                System.out.println("HelloSender訊息傳送失敗" + cause + correlationData.toString());
            } else {
                System.out.println("HelloSender 訊息傳送成功 ");
            }
        });
        this.rabbitTemplate.convertAndSend("hello", context);
    }
 
    public void sendObj() {
       MessageObj obj = new MessageObj();
       obj.setACK(false);
       obj.setId(123);
       obj.setName("zhangsan");
       obj.setValue("data");
       System.out.println("傳送 : " + obj);
       this.rabbitTemplate.convertAndSend("helloObj", obj);
    }
 
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("sender return success" + message.toString()+"==="+i+"==="+s1+"==="+s2);
    }
 
//    @Override
//    public void confirm(CorrelationData correlationData, boolean b, String s) {
//        System.out.println("sender success");
//    }
 
}

訊息接受者 Receiver 註解方式接受訊息

package com.example.demo;
 
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
 
import java.io.IOException;
import java.util.Date;
import java.util.Map;
 
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
 
    @RabbitHandler
    public void process(String hello,Channel channel, Message message) throws IOException {
        System.out.println("HelloReceiver收到  : " + hello +"收到時間"+new Date());
        try {
            //告訴伺服器收到這條訊息 已經被我消費了 可以在佇列刪掉 這樣以後就不會再發了 否則訊息伺服器以為這條訊息沒處理掉 後續還會在發
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("receiver success");
        } catch (IOException e) {
            e.printStackTrace();
            //丟棄這條訊息
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            System.out.println("receiver fail");
        }
 
    }
}

備註:我們用註解的方式來接受訊息 就不要用 自己建立物件實現ChannelAwareMessageListener的方式來接受訊息 這種方式還要去全局裡面配置 麻煩,直接用@RabbitListener(queues = “hello”)最簡單

訊息確認 因為我在屬性配置檔案裡面開啟了ACK確認 所以如果程式碼沒有執行ACK確認 你在RabbitMQ的後臺會看到訊息會一直留在佇列裡面未消費掉 只要程式一啟動開始接受該佇列訊息的時候 又會收到

 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//訊息的標識,false只確認當前一個訊息收到,true確認所有consumer獲得的訊息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

//ack返回false,並重新回到佇列,api裡面解釋得很清楚

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

//拒絕訊息

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

TestController測試

@Autowired
    private HelloSender helloSender;
 
    /**
     * 單生產者-單個消費者
     */
    @RequestMapping("/test")
    public void hello() throws Exception {
        helloSender.send();
    }

相關推薦

rabbitmq結合springboot實現ACK機制消費機制

###1.RabbitMQ介紹 RabbitMQ是實現AMQP(高階訊息佇列協議)的訊息中介軟體的一種,最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。RabbitMQ主要是為了實現系統之間的雙向解耦而實現的。

java操作樹莓派GPIO控制LED燈--結合springboot實現介面呼叫

1、概述 本文使用java結合springboot實現了對樹莓派GPIO介面的操作以達到控制LED燈的功能 2、pom檔案如下: <project xmlns="http://maven.apache.org/POM/4.0.0"      &nb

第5篇 RabbitMQ整合SpringBoot實現Direct模式

直接程式碼  專案結構 pom需要增加對RabbitM的支援 Pom檔案如下 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/P

RabbitMq + Spring 實現ACK機制

概念性解讀(Ack的靈活)          首先啊,有的人不是太理解這個Ack是什麼,講的接地氣一點,其實就是一個通知,怎麼說呢,當我監聽消費者,正常情況下,不會出異常,但是如果是出現了異常,甚至是沒有獲取的異常,那是不是這條資料就會作廢,但是我們肯定不希望這樣的情況

RabbitMQ ——與SpringBoot整合並實現訊息確認機制

不囉嗦直接上程式碼 目錄結構如下: pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instanc

SpringBoot 整合 RabbitMQ(包含三種訊息確認機制以及消費端限流)

目錄 說明 生產端 消費端 說明 本文 SpringBoot 與 RabbitMQ 進行整合的時候,包含了三種訊息的確認模式,如果查詢詳細的確認模式設定,請閱讀:RabbitMQ的三種訊息確認

java rabbitmq ack訊息確認機制

ackage com.example.demo.ConsumerDemo; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframewo

RabbitMQ ACK 訊息確認機制

注意如果拋異常或unack(並且requeue為true),訊息會一直重新入佇列,一不小心就會xxxxx一大堆訊息不斷重複~。 //訊息的標識,false只確認當前一個訊息收到,true確認所有consumer獲得的訊息 (正常消費) channel.basicAck(message.getMess

RabbitMQ中RPC的實現及其通信機制

pub elf tcl consumer 兩個 rabbit client margin result RabbitMQ中RPC的實現:客戶端發送請求消息,服務端回復響應消息,為了接受響應response,客戶端需要發送一個回調隊列的地址來接受響應,每條消息在發送的時候會帶

Springboot整合Rabbitmq實現延時消費,並實現可靠的訊息處理

一、Rabbitmq簡介1.1 rabbitmq 架構1.2 rabbitmq相關元件介紹exchange: 交換機,主要用來將生產者傳送的訊息路由給伺服器中的佇列。routing-key: 訊息路由的key,生產者在將訊息發到到exchange的時候,需要指定routing

Map實現java緩存機制的簡單實例

個數 charset shm run 測試的 分享 初始化 tty tco 緩存是Java中主要的內容,主要目的是緩解項目訪問數據庫的壓力以及提升訪問數據的效率,以下是通過Map實現java緩存的功能,並沒有用cache相關框架。 一、緩存管理類 CacheMgr.java

反匯編探索C++虛函數實現多態的機制

虛函數虛函數是C++實現多態的機制,那麽它是如何做到的呢?以下通過反匯編探索虛函數內存模型,查看虛函數實現多態的過程。工具Visual studio 2017:以下程序僅做VC++編譯器下的32位程序探討,其他編譯器與64位程序所產生的差異不作討論。反匯編過程首先聲明一個不包含虛函數的簡單C++類,如下:在構

WEB開發----springboot的登錄攔截機制

mod dep spring pretty IE view .config comm 登錄界面 如果是一個後臺的管理項目的,有些東西是不能直接就可以訪問的,必須要登錄才可以進去,所以就需要進行登錄攔截,只有登錄過的用戶才可以正常訪問. 登錄攔

Java Springboot結合FastDFS實現檔案上傳以及根據圖片url將圖片上傳至圖片伺服器

上一篇文章我們已經講解了如何搭建FastDFS圖片伺服器,環境我們準備好了現在就讓我們開始與Java結合將他應用到實際的專案中吧。本篇文章我們將會展示上傳圖片到FastDFS圖片伺服器以及通過外網的圖片url將圖片上傳至我們自己的圖片伺服器中。 1.建立springbo

SpringBoot結合Flyway實現資料庫版本管理及配置檔案說明

SpringBoot結合Flyway實現資料庫版本管理及配置檔案說明 文章目錄 前言 例項 其它配置 拓展 前言 Flyway是個很好的資料庫版本管理工具,根據版本號順序執行sql檔案,維護一個統一的資料庫,適用於多人協作

java——利用生產者消費者模式思想實現簡易版handler機制

參考教程:http://www.sohu.com/a/237792762_659256 首先介紹每一個類: 1.Message:   這個類的作用是儲存一個生產者生產出來的具體的訊息,就類似連結串列佇列中的一個節點,自行定義需要儲存的內容。   code:訊息要執行的具體動作程式碼   msg:訊息

java 筆試 java中實現多型的機制是什麼

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

muduo網路庫學習筆記(四) 通過eventfd實現的事件通知機制

目錄 muduo網路庫學習筆記(四) 通過eventfd實現的事件通知機制 eventfd的使用 eventfd系統函式 使用示例 EventLoop對eventfd的封裝 工作時序 runInLoo

muduo網絡庫學習筆記(四) 通過eventfd實現的事件通知機制

添加 最大的 atom times sin eas functor lee 單純 目錄 muduo網絡庫學習筆記(四) 通過eventfd實現的事件通知機制 eventfd的使用 eventfd系統函數 使用示例 EventLoop對eventfd的封裝 工作時序 ru

用wireshark抓包分析TCP三次握手、四次揮手以及TCP實現可靠傳輸的機制(轉)

關於TCP三次握手和四次揮手大家都在《計算機網路》課程裡學過,還記得當時高超老師耐心地講解。大學裡我遇到的最好的老師大概就是這位了,雖然他只給我講過《java程式設計》和《計算機網路》,但每次課幾乎都動手敲程式碼或者當場做實驗。好了不扯了,下面進入正題。      關