1. 程式人生 > >SpringCloud Stream+RabbitMQ訊息分組

SpringCloud Stream+RabbitMQ訊息分組

本篇記錄SpringCloud Stream+RabbitMQ 訊息分組功能的實現。

訊息分組介紹

        通常在生產環境,我們的每個服務都不會以單節點的方式執行在生產環境,當同一個服務啟動多個例項的時候,這些例項都會繫結到同一個訊息通道的目標主題(Topic)上。 預設情況下,當生產者發出一條訊息到繫結通道上,這條訊息會產生多個副本被每個消費者例項接收和處理,但是有些業務場景之下,我們希望生產者產生的訊息只被其中一個例項消費(不管被哪個例項處理),這個時候我們需要為這些消費者設定消費組來實現這樣的功能,實現的方式非常簡單,我們只需要在服務消費者端設定spring.cloud.stream.bindings.{channel-name}.group屬性即可。

        在本篇中有三個專案節點,StreamProvider是訊息生產端,StreamConsumer0和StreamConsumer1是訊息消費端。

1 父maven工程

1.1 工程結構如下:

在這裡插入圖片描述

1.2 pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion> <groupId>com.study</groupId> <artifactId>cloud-ma</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>pom</packaging> <name>SpringCloudStudy</name> <description>SpringCloudStudy</
description
>
<!-- 私有倉庫的配置 --> <repositories> <repository> <id>nexus</id> <!-- 和setting.xml中配置的id保持一致 --> <url>http://xxx.xxx.xxx.xxx:8081/repository/maven-public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.3.RELEASE</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-cloud.version>Finchley.RELEASE</spring-cloud.version> </properties> <dependencies> <!-- 上邊引入 parent,因此 下邊無需指定版本 --> <!-- 包含 mvc,aop 等jar資源 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion><!-- 去除預設log配置 --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <!-- 配置log4j2 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!-- 配置log4j2 --> <!-- 支援識別yml配置 --> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-yaml</artifactId> </dependency> <!-- 支援識別yml配置 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 熱部署 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> <scope>true</scope> </dependency> <!--開始 阿里的fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.51</version> </dependency> <!--結束 阿里的fastjson --> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <!-- 沒有該配置,devtools 不生效 --> <fork>true</fork> </configuration> </plugin> </plugins> </build> <modules> <module>EurekaServer</module> <module>EurekaClientHi</module> <module>EurekaClientRibbonCustomer</module> <module>EurekaClientHi2</module> <module>EurekaClientFeignCustomer</module> <module>EurekaClientZuul</module> <module>config_server</module> <module>config-client</module> <module>config-server-svn</module> <module>config-client-svn</module> <module>StreamProvider</module> <module>stream-output</module> <module>stream-input</module> <module>StreamRabbitMQSelf</module> <module>StreamConsumer0</module> <module>StreamConsumer1</module> </modules> </project>

2 StreamProvider工程節點(訊息生產端)

2.1 工程結構

在這裡插入圖片描述

2.2 POM.xml

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.study</groupId>
    <artifactId>cloud-ma</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>

  <artifactId>StreamProvider</artifactId>
  <packaging>jar</packaging>
  <name>StreamProvider</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

2.3 application.yml

server:
  port: 8089
spring:
  cloud:
    stream:
      binders: 
        defaultRabbit: 
          type: rabbit
          environment:                                      #配置rabbimq連線環境
            spring: 
              rabbitmq:
                host: xxx.xxx.xxx.xxx
                username: mazhen
                password: mazhen
                virtual-host: / 
      bindings: 
        output:                                             #生產者繫結,這個是訊息通道的名稱
          destination: exchange-msg                         #exchange名稱,交換模式預設是topic;把SpringCloud stream的訊息輸出通道繫結到RabbitMQ的exchange-msg交換器。
          content-type: application/json

配置了spring.cloud.stream.bindings.output.destination=exchange-msg 後會在RabbitMQ 中建立一個名為 exchange-msg 交換器(exchange)。spring.cloud.stream.bindings.output.destination=exchange-msg 的意思是把 spring cloud stream 的訊息輸出通道繫結到 RabbitMQ 的 exchange-msg 交換器。

2.4 訊息生產類

2.4.1 訊息生產類—介面

/**
 * 
 */
package com.stream.provider.rabbitMQ.service;

import org.springframework.integration.core.MessageSource;

/**
 * @author mazhen
 *
 */
public interface SendMsg {
	public MessageSource<Integer> timerMessageSource();
}

2.4.2 訊息生產類—實現類

/**
 * 
 */
package com.stream.provider.rabbitMQ.service.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;

import com.stream.provider.rabbitMQ.service.SendMsg;

/**
 * @author mazhen
 *
 */
@EnableBinding(value={Source.class})
public class SendMsgImpl implements SendMsg {
   
	private static Logger logger = LoggerFactory.getLogger(SendMsgImpl.class);
	
	private Integer i=0;
	
	@Bean
	@InboundChannelAdapter(value = Source.OUTPUT , poller = @Poller(fixedDelay = "2000", maxMessagesPerPoll = "1"))
	@Override
	public MessageSource<Integer> timerMessageSource() {
		logger.info("傳送訊息:"+i++);
		return () -> new GenericMessage<>(i++);
	}
}

2.5 啟動類

package com.stream.provider;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Hello world!
 *
 */
@SpringBootApplication
public class StreamProviderApplication {
    public static void main( String[] args ) {
        SpringApplication.run(StreamProviderApplication.class, args);
    }
}

3 StreamConsumer0工程(消費端)

3.1 工程結構

在這裡插入圖片描述

3.2 POM.xml

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.study</groupId>
    <artifactId>cloud-ma</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>

  <artifactId>StreamConsumer0</artifactId>
  <name>StreamConsumer0</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
   
    
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

3.3 application.yml

server:
  port: 8090
spring:
  cloud:
    stream:
      binders: 
        defaultRabbit: 
          type: rabbit
          environment:                                      #配置rabbimq連線環境
            spring: 
              rabbitmq:
                host: xxx.xxx.xxx.xxx
                username: xxx
                password: xxx
                virtual-host: / 
      bindings: 
        input:                                              #生產者繫結,這個是訊息通道的名稱
          group: group-A                                    #該專案節點為訊息組group-A的一個消費端         
          destination: exchange-msg                         #exchange名稱,交換模式預設是topic;把SpringCloud stream的訊息輸入通道繫結到RabbitMQ的exchange-msg交換器。
          content-type: application/json

配置了 spring.cloud.stream.bindings.input.destination=exchange-msg 後會在RabbitMQ 中建立一個名為 exchange-msg 交換器(exchange)。spring.cloud.stream.bindings.input.destination=exchange-msg 的意思是把 spring cloud stream 的輸入通道繫結到 RabbitMQ 的 exchange-msg 交換器。這樣工程節點 StreamConsumer0 的輸入通道對應節點 StreamProvider 的輸出通道,StreamConsumer0 節點就配置成了 StreamProvider 節點的消費端。spring.cloud.stream.bindings.input.group=group-A 配置 StreamConsumer0 為訊息組 group-A 中的一個消費端。這兩個配置項聯合起來解釋,就是把節點 StreamConsumer0 的輸入通道繫結到 RabbitMQ 的 exchange-msg 交換器,並設定為 exchange-msg 交換器中 group-A 訊息消費組中的消費端節點。

3.4 訊息消費類

3.4.1 訊息消費類—介面

/**
 * 
 */

            
           

相關推薦

SpringCloud Stream+RabbitMQ訊息分組

本篇記錄SpringCloud Stream+RabbitMQ 訊息分組功能的實現。 訊息分組介紹         通常在生產環境,我們的每個服務都不會以單節點的方式執行在生產環境,當同一個服務啟動多個例項的時候,這些例項都會繫結到同一

SpringCloud——Stream訊息驅動)

一、Spring Cloud Stream簡介           Spring Cloud Stream 是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程式。他通過使用Spring Integr

Spring Cloud Stream + RabbitMQ 訊息生成和訊息消費

在本 DEMO中有兩個節點互為訊息的生產者和訊息消費者。 一、節點1 1. pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/P

使用SpringCloud Stream結合rabbitMQ實現訊息消費失敗重發機制

> 前言:實際專案中經常遇到訊息消費失敗了,要進行訊息的重發。比如支付訊息消費失敗後,要分不同時間段進行N次的訊息重發提醒。 # 本文模擬場景 1. 當金額少於100時,訊息消費成功 1. 當金額大於100,小於200時,會進行3次重發,第一次1秒;第二次2秒;第三次3秒。 1. 當金額大於200時

SpringCloud Stream生產者配置RabbitMq的動態路由鍵

在寫這個文章前不得不吐槽目前國內一些blog的文章,盡是些複製貼上的文章,提到點上但沒任何的深入和例子。......... 經過測試下來總結一下RabbitMQ的Exchange的特性: 1、direct 生產者可以指定路由鍵,消費者可以指定路由鍵,但不能講路由鍵設定為#(全部)。 2、topic

每天學點SpringCloud(十三):SpringCloud-Stream整合RabbitMQ

我們知道,當微服務越來越來多的時候,僅僅是feign的http呼叫方式已經滿足不了我們的使用場景了。這個時候系統就需要接入訊息中介軟體了。相比較於傳統的Spring專案、SpringBoot專案使用訊息中介軟體的很多配置不同,SpringCloud Stream抽象了中介軟體產品的不同,在SpringClou

springcloud stream rabbit 對訊息中介軟體進一步封裝

為了實現服務之間的解耦,以及動態的切換topic的功能,springcloud提供了stream對訊息中介軟體的封裝,不然rabbitMQ以及Kafka 1.從springcloud-stream的使用來看,引入啟動依賴 2.定義介面來使用註解@input(“streamName”) 返

SpringCloud Stream 訊息同步和非同步

1.非同步的形式有 通知:單向請求,只管傳送不關心結果。 請求/非同步響應:一對一的互動。請求的資訊不回立馬返回,而是過一段時間再返回結果。 訊息:利用訊息可以實現一對多形態的互動。 2.MQ的應用場景;非同步處理、流量削鋒、日誌處理、應用解耦 3.pom檔案引入架包依賴

springcloud實戰之13 rabbitmq訊息匯流排(bus)

rabbitmq是實現了高階訊息佇列協議(amqp)的開源訊息代理軟體,也成為面向訊息的中介軟體。RabbitMQ伺服器是用高效能,可伸縮而聞名的Erlang語言編寫而成的,其叢集和故障轉移是構建在開放電信平臺框架上的。 RabbitMQ的安裝這裡不重複說明,

SpringCloud Stream訊息驅動模組(使用kafka)

說明之前我們使用的是RabbitMQ與Stream的結合完成訊息驅動模組,這次使用Kafka與Stream。目標本文的目的在於結合Kafka與Stream來處理訊息通訊,採取自定義編寫Sink(input)和Source(output)來設定多通道訊息和消費組、消費分割槽等操

springcloud】使用 Spring Cloud Stream 構建訊息驅動微服務

微服務的目的: 鬆耦合 事件驅動的優勢:高度解耦 Spring Cloud Stream 的幾個概念 Spring Cloud Stream is a framework for building message-driven microservice app

SpringCloud-Stream 訊息驅動

# 一、概述 ## 是什麼? Spring Cloud Stream 是一個構建訊息微服務驅動的框架。可以遮蔽底層訊息中介軟體的差異,降低版本切換成本,統一訊息的程式設計模型,目前僅支援 RabbitMQ 和 Kafka。 ## 設計思想 ### 標準 MQ 的設計思想 ![](https://gi

九. SpringCloud Stream訊息驅動

#### 1. 訊息驅動概述 ##### 1.1 是什麼 在實際應用中有很多訊息中介軟體,比如現在企業裡常用的有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,學習所有這些訊息中介軟體無疑需要大量時間經歷成本,那有沒有一種技術,使我們不再需要關注具體的訊息中介軟體的細節,而只需要用一種

Storm框架:如何消費RabbitMq訊息(程式碼案例)

1、定義拓撲topology public class MessageTopology { public static void main(String[] args) throws Exception { //組裝topology TopologyBuilder

MassTransit RabbitMq 訊息整合命令與事件釋出

一.MassTransit    MassTransit 是一個免費開源輕量級的.net平臺下的訊息匯流排系統。我們將介紹如果使用MassTransit整合我們系統中的事件與命令。專案地址:https://github.com/MassTransit/MassTransit。本文使用一次簡單的下訂

SpringCloud Stream + Rabbit MQ

介紹 通過stream可以讓程式跟具體佇列元件解耦,程式不用關心佇列元件的使用,只要建立好相應的通道,不論佇列元件怎麼更換,程式都無需關心。stream讓程式通過通道來進行訊息的生產和消費。     Stream中的input和output只是個別名,不論生

Redis快取和RabbitMQ訊息解決購車問題(使用者登入,使用者未登入和購物車合併)(轉載)

本文轉自:https://blog.csdn.net/millery22/article/details/49756667;怕以後找不到就cp下來了 在逛各大電商網站的時候,總會有將商品加入購物車,然後合併付款,這個大大的提高了使用者的體驗,某東更是任性,在未登入的情況下都可

Spring Cloud Stream RabbitMQ 配置死信佇列,消費死信佇列

Spring Cloud Stream RabbitMQ 配置死信佇列,消費死信佇列 Application.java package com.buxiaoxia; import lombok.extern.slf4j.Slf4j; import org.springframewo

RabbitMQ訊息中介軟體技術精講無密完結版

第1章 課程介紹 本章首先讓大家徹底明白為什麼學習RabbitMQ,通過本課程的學習具體收穫有哪些?課程內容具體安排與學習建議,然後為大家簡單介紹下業界主流訊息中介軟體有哪些,各自適用場景等。 1-1 課程導學 1-2 業界主流訊息中介軟體介紹 第2章 低門檻,入門RabbitMQ核心概念 本

rabbitmq訊息傳送確認和消費訊息手動刪除訊息

0.application.properties新增如下配置 # 訊息傳送至exchange callback spring.rabbitmq.publisher-confirms=true # 訊息傳送至queue 失敗才callback spring.rabbitmq.publi