SpringCloud Stream+RabbitMQ訊息分組
本篇記錄SpringCloud Stream+RabbitMQ 訊息分組功能的實現。
訊息分組介紹
通常在生產環境,我們的每個服務都不會以單節點的方式執行在生產環境,當同一個服務啟動多個例項的時候,這些例項都會繫結到同一個訊息通道的目標主題(Topic)上。 預設情況下,當生產者發出一條訊息到繫結通道上,這條訊息會產生多個副本被每個消費者例項接收和處理,但是有些業務場景之下,我們希望生產者產生的訊息只被其中一個例項消費(不管被哪個例項處理),這個時候我們需要為這些消費者設定消費組來實現這樣的功能,實現的方式非常簡單,我們只需要在服務消費者端設定spring.cloud.stream.bindings.{channel-name}.group屬性即可。
在本篇中有三個專案節點,StreamProvider是訊息生產端,StreamConsumer0和StreamConsumer1是訊息消費端。
1 父maven工程
1.1 工程結構如下:
1.2 pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
<modelVersion>4.0.0</modelVersion>
<groupId>com.study</groupId>
<artifactId>cloud-ma</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>SpringCloudStudy</name>
<description>SpringCloudStudy</ description>
<!-- 私有倉庫的配置 -->
<repositories>
<repository>
<id>nexus</id> <!-- 和setting.xml中配置的id保持一致 -->
<url>http://xxx.xxx.xxx.xxx:8081/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
</properties>
<dependencies>
<!-- 上邊引入 parent,因此 下邊無需指定版本 -->
<!-- 包含 mvc,aop 等jar資源 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion><!-- 去除預設log配置 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 配置log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- 配置log4j2 -->
<!-- 支援識別yml配置 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<!-- 支援識別yml配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 熱部署 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
<scope>true</scope>
</dependency>
<!--開始 阿里的fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<!--結束 阿里的fastjson -->
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!-- 沒有該配置,devtools 不生效 -->
<fork>true</fork>
</configuration>
</plugin>
</plugins>
</build>
<modules>
<module>EurekaServer</module>
<module>EurekaClientHi</module>
<module>EurekaClientRibbonCustomer</module>
<module>EurekaClientHi2</module>
<module>EurekaClientFeignCustomer</module>
<module>EurekaClientZuul</module>
<module>config_server</module>
<module>config-client</module>
<module>config-server-svn</module>
<module>config-client-svn</module>
<module>StreamProvider</module>
<module>stream-output</module>
<module>stream-input</module>
<module>StreamRabbitMQSelf</module>
<module>StreamConsumer0</module>
<module>StreamConsumer1</module>
</modules>
</project>
2 StreamProvider工程節點(訊息生產端)
2.1 工程結構
2.2 POM.xml
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.study</groupId>
<artifactId>cloud-ma</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>StreamProvider</artifactId>
<packaging>jar</packaging>
<name>StreamProvider</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
2.3 application.yml
server:
port: 8089
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment: #配置rabbimq連線環境
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
username: mazhen
password: mazhen
virtual-host: /
bindings:
output: #生產者繫結,這個是訊息通道的名稱
destination: exchange-msg #exchange名稱,交換模式預設是topic;把SpringCloud stream的訊息輸出通道繫結到RabbitMQ的exchange-msg交換器。
content-type: application/json
配置了spring.cloud.stream.bindings.output.destination=exchange-msg 後會在RabbitMQ 中建立一個名為 exchange-msg 交換器(exchange)。spring.cloud.stream.bindings.output.destination=exchange-msg 的意思是把 spring cloud stream 的訊息輸出通道繫結到 RabbitMQ 的 exchange-msg 交換器。
2.4 訊息生產類
2.4.1 訊息生產類—介面
/**
*
*/
package com.stream.provider.rabbitMQ.service;
import org.springframework.integration.core.MessageSource;
/**
* @author mazhen
*
*/
public interface SendMsg {
public MessageSource<Integer> timerMessageSource();
}
2.4.2 訊息生產類—實現類
/**
*
*/
package com.stream.provider.rabbitMQ.service.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
import com.stream.provider.rabbitMQ.service.SendMsg;
/**
* @author mazhen
*
*/
@EnableBinding(value={Source.class})
public class SendMsgImpl implements SendMsg {
private static Logger logger = LoggerFactory.getLogger(SendMsgImpl.class);
private Integer i=0;
@Bean
@InboundChannelAdapter(value = Source.OUTPUT , poller = @Poller(fixedDelay = "2000", maxMessagesPerPoll = "1"))
@Override
public MessageSource<Integer> timerMessageSource() {
logger.info("傳送訊息:"+i++);
return () -> new GenericMessage<>(i++);
}
}
2.5 啟動類
package com.stream.provider;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Hello world!
*
*/
@SpringBootApplication
public class StreamProviderApplication {
public static void main( String[] args ) {
SpringApplication.run(StreamProviderApplication.class, args);
}
}
3 StreamConsumer0工程(消費端)
3.1 工程結構
3.2 POM.xml
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.study</groupId>
<artifactId>cloud-ma</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>StreamConsumer0</artifactId>
<name>StreamConsumer0</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
3.3 application.yml
server:
port: 8090
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment: #配置rabbimq連線環境
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
username: xxx
password: xxx
virtual-host: /
bindings:
input: #生產者繫結,這個是訊息通道的名稱
group: group-A #該專案節點為訊息組group-A的一個消費端
destination: exchange-msg #exchange名稱,交換模式預設是topic;把SpringCloud stream的訊息輸入通道繫結到RabbitMQ的exchange-msg交換器。
content-type: application/json
配置了 spring.cloud.stream.bindings.input.destination=exchange-msg 後會在RabbitMQ 中建立一個名為 exchange-msg 交換器(exchange)。spring.cloud.stream.bindings.input.destination=exchange-msg 的意思是把 spring cloud stream 的輸入通道繫結到 RabbitMQ 的 exchange-msg 交換器。這樣工程節點 StreamConsumer0 的輸入通道對應節點 StreamProvider 的輸出通道,StreamConsumer0 節點就配置成了 StreamProvider 節點的消費端。spring.cloud.stream.bindings.input.group=group-A 配置 StreamConsumer0 為訊息組 group-A 中的一個消費端。這兩個配置項聯合起來解釋,就是把節點 StreamConsumer0 的輸入通道繫結到 RabbitMQ 的 exchange-msg 交換器,並設定為 exchange-msg 交換器中 group-A 訊息消費組中的消費端節點。
3.4 訊息消費類
3.4.1 訊息消費類—介面
/**
*
*/
相關推薦
SpringCloud Stream+RabbitMQ訊息分組
本篇記錄SpringCloud Stream+RabbitMQ 訊息分組功能的實現。
訊息分組介紹
通常在生產環境,我們的每個服務都不會以單節點的方式執行在生產環境,當同一個服務啟動多個例項的時候,這些例項都會繫結到同一
SpringCloud——Stream(訊息驅動)
一、Spring Cloud Stream簡介
Spring Cloud Stream 是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程式。他通過使用Spring Integr
Spring Cloud Stream + RabbitMQ 訊息生成和訊息消費
在本 DEMO中有兩個節點互為訊息的生產者和訊息消費者。
一、節點1
1. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/P
使用SpringCloud Stream結合rabbitMQ實現訊息消費失敗重發機制
> 前言:實際專案中經常遇到訊息消費失敗了,要進行訊息的重發。比如支付訊息消費失敗後,要分不同時間段進行N次的訊息重發提醒。
# 本文模擬場景
1. 當金額少於100時,訊息消費成功
1. 當金額大於100,小於200時,會進行3次重發,第一次1秒;第二次2秒;第三次3秒。
1. 當金額大於200時
SpringCloud Stream生產者配置RabbitMq的動態路由鍵
在寫這個文章前不得不吐槽目前國內一些blog的文章,盡是些複製貼上的文章,提到點上但沒任何的深入和例子。.........
經過測試下來總結一下RabbitMQ的Exchange的特性:
1、direct
生產者可以指定路由鍵,消費者可以指定路由鍵,但不能講路由鍵設定為#(全部)。
2、topic
每天學點SpringCloud(十三):SpringCloud-Stream整合RabbitMQ
我們知道,當微服務越來越來多的時候,僅僅是feign的http呼叫方式已經滿足不了我們的使用場景了。這個時候系統就需要接入訊息中介軟體了。相比較於傳統的Spring專案、SpringBoot專案使用訊息中介軟體的很多配置不同,SpringCloud Stream抽象了中介軟體產品的不同,在SpringClou
springcloud stream rabbit 對訊息中介軟體進一步封裝
為了實現服務之間的解耦,以及動態的切換topic的功能,springcloud提供了stream對訊息中介軟體的封裝,不然rabbitMQ以及Kafka 1.從springcloud-stream的使用來看,引入啟動依賴 2.定義介面來使用註解@input(“streamName”) 返
SpringCloud Stream 訊息同步和非同步
1.非同步的形式有
通知:單向請求,只管傳送不關心結果。
請求/非同步響應:一對一的互動。請求的資訊不回立馬返回,而是過一段時間再返回結果。
訊息:利用訊息可以實現一對多形態的互動。
2.MQ的應用場景;非同步處理、流量削鋒、日誌處理、應用解耦
3.pom檔案引入架包依賴
springcloud實戰之13 rabbitmq訊息匯流排(bus)
rabbitmq是實現了高階訊息佇列協議(amqp)的開源訊息代理軟體,也成為面向訊息的中介軟體。RabbitMQ伺服器是用高效能,可伸縮而聞名的Erlang語言編寫而成的,其叢集和故障轉移是構建在開放電信平臺框架上的。
RabbitMQ的安裝這裡不重複說明,
SpringCloud Stream訊息驅動模組(使用kafka)
說明之前我們使用的是RabbitMQ與Stream的結合完成訊息驅動模組,這次使用Kafka與Stream。目標本文的目的在於結合Kafka與Stream來處理訊息通訊,採取自定義編寫Sink(input)和Source(output)來設定多通道訊息和消費組、消費分割槽等操
【springcloud】使用 Spring Cloud Stream 構建訊息驅動微服務
微服務的目的: 鬆耦合
事件驅動的優勢:高度解耦
Spring Cloud Stream 的幾個概念
Spring Cloud Stream is a framework for building message-driven microservice app
SpringCloud-Stream 訊息驅動
# 一、概述
## 是什麼?
Spring Cloud Stream 是一個構建訊息微服務驅動的框架。可以遮蔽底層訊息中介軟體的差異,降低版本切換成本,統一訊息的程式設計模型,目前僅支援 RabbitMQ 和 Kafka。
## 設計思想
### 標準 MQ 的設計思想
![](https://gi
九. SpringCloud Stream訊息驅動
#### 1. 訊息驅動概述
##### 1.1 是什麼
在實際應用中有很多訊息中介軟體,比如現在企業裡常用的有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,學習所有這些訊息中介軟體無疑需要大量時間經歷成本,那有沒有一種技術,使我們不再需要關注具體的訊息中介軟體的細節,而只需要用一種
Storm框架:如何消費RabbitMq訊息(程式碼案例)
1、定義拓撲topology
public class MessageTopology {
public static void main(String[] args) throws Exception {
//組裝topology
TopologyBuilder
MassTransit RabbitMq 訊息整合命令與事件釋出
一.MassTransit
MassTransit 是一個免費開源輕量級的.net平臺下的訊息匯流排系統。我們將介紹如果使用MassTransit整合我們系統中的事件與命令。專案地址:https://github.com/MassTransit/MassTransit。本文使用一次簡單的下訂
SpringCloud Stream + Rabbit MQ
介紹
通過stream可以讓程式跟具體佇列元件解耦,程式不用關心佇列元件的使用,只要建立好相應的通道,不論佇列元件怎麼更換,程式都無需關心。stream讓程式通過通道來進行訊息的生產和消費。
Stream中的input和output只是個別名,不論生
Redis快取和RabbitMQ訊息解決購車問題(使用者登入,使用者未登入和購物車合併)(轉載)
本文轉自:https://blog.csdn.net/millery22/article/details/49756667;怕以後找不到就cp下來了
在逛各大電商網站的時候,總會有將商品加入購物車,然後合併付款,這個大大的提高了使用者的體驗,某東更是任性,在未登入的情況下都可
Spring Cloud Stream RabbitMQ 配置死信佇列,消費死信佇列
Spring Cloud Stream RabbitMQ 配置死信佇列,消費死信佇列
Application.java
package com.buxiaoxia;
import lombok.extern.slf4j.Slf4j;
import org.springframewo
RabbitMQ訊息中介軟體技術精講無密完結版
第1章 課程介紹 本章首先讓大家徹底明白為什麼學習RabbitMQ,通過本課程的學習具體收穫有哪些?課程內容具體安排與學習建議,然後為大家簡單介紹下業界主流訊息中介軟體有哪些,各自適用場景等。 1-1 課程導學 1-2 業界主流訊息中介軟體介紹 第2章 低門檻,入門RabbitMQ核心概念 本
rabbitmq訊息傳送確認和消費訊息手動刪除訊息
0.application.properties新增如下配置
# 訊息傳送至exchange callback
spring.rabbitmq.publisher-confirms=true
# 訊息傳送至queue 失敗才callback
spring.rabbitmq.publi