SpringCloud——Stream(訊息驅動)
一、Spring Cloud Stream簡介
Spring Cloud Stream 是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程式。他通過使用Spring Integration來連線訊息代理中介軟體以實現訊息事件驅動。Spring Cloud Stream 為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,引用了釋出-訂閱、消費組、分割槽的三個核心概念。
1、四種整合風格:
- 檔案傳輸:兩個系統生成檔案,檔案的有效負載就是由另一個系統處理的訊息。該類風格的例子之一是針對檔案輪詢目錄或FTP目錄,並處理該檔案。
- 共享資料庫:兩個系統查詢同一個資料庫以獲取要傳遞的資料。一個例子是你部署了兩個EAR應用,它們的實體類(JPA、Hibernate等)共用同一個表。
- 遠端過程呼叫:兩個系統都暴露另一個能呼叫的服務。該類例子有EJB服務,或SOAP和REST服務。
- 訊息:兩個系統連線到一個公用的訊息系統,互相交換資料,並利用訊息呼叫行為。該風格的例子就是眾所周知的中心輻射式的(hub-and-spoke)JMS架構。
2、應用模型
Spring Cloud Stream由一箇中間件中立的核組成。應用通過Spring Cloud Stream插入的input和output通道與外界交流。通道通過指定中介軟體的Binder實現與外部代理連線。業務開發者不再關注具體訊息中介軟體,只需關注Binder對應用程式提供的抽象概念來使用訊息中介軟體實現業務即可。
3、繫結器
通過定義繫結器作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離。通過嚮應用程式暴露統一的Channel通過,是的應用程式不需要再考慮各種不同的訊息中介軟體的實現。當需要升級訊息中介軟體,或者是更換其他訊息中介軟體產品時,我們需要做的就是更換對應的Binder繫結器而不需要修改任何應用邏輯 。
4、釋出-訂閱
應用間通訊遵照發布-訂閱模型,訊息通過共享主題進行廣播。如圖所示,顯示了互動的Spring Cloud Stream 應用的典型佈局。
未處理的感測資料釋出到raw-sensor-data的Topic進行廣播,Averages 和IngestHDFS同時訂閱了此訊息,收到訊息後觸發自身的處理邏輯。
5、消費組
由於釋出-訂閱模型使得共享主題的應用之間連線更簡便,建立給定應用的不同例項來進行彈性擴張的能力也同樣重要。如果存在多個應用例項,那麼同一應用的額不同例項便會成為相互競爭的消費者,其中應該只有一個例項處理給定訊息。 SpringCloud Stream通過消費者組的概念給這種情況進行建模。每一個單獨的消費者可以使用spring.cloud.stream.bindings.input.group屬性來指定一個組名字。如圖中展示的消費者們,這一屬性被設定為spring.cloud.stream.bindings.input.group=hdfsWrite或者spring.cloud.stream.bindings.input.group=average。
所有訂閱給定目標的組都會收到釋出訊息的一個拷貝,但是每一個組內只有一個成員會收到該訊息。預設情況下,如果沒有指定組,Spring Cloud Stream 會將該應用指定給一個匿名的獨立的單成員消費者組,後者與所有其他組都處於一個釋出-訂閱關係中。
6、訊息分割槽
Spring Cloud Stream對給定應用的多個例項之間分隔資料予以支援。一個或者多個生產者應用例項給多個消費者應用例項傳送訊息並確保相同特徵的資料被同一消費者例項處理。 Spring Cloud Stream對分割的程序例項實現進行了抽象。使得Spring Cloud Stream 為不具備分割槽功能的訊息中介軟體(RabbitMQ)也增加了分割槽功能擴充套件。
二、程式碼實現
1、建立消費者(stream-sink)
pom.xml依賴檔案:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.demo.stream</groupId>
<artifactId>stream-sink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>stream-sink</name>
<description></description>
<parent>
<groupId>com.example.demo</groupId>
<artifactId>springcloud-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml配置檔案:
server:
port: 8081
spring:
application:
name: stream-sink
rabbitmq:
host: 192.168.0.132
port: 5672
username: admin
password: admin
cloud:
stream:
binders:
input:
destination: output
消費者例項SinkReceiver.java檔案:
package com.example.demo.stream.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
* 路徑:com.example.demo.stream.service
* 類名:
* 功能:用來指定一個或多個定義了@Input或@Output註解的介面
* 備註:消費者
* 建立人:typ
* 建立時間:2018/9/25 14:45
* 修改人:
* 修改備註:
* 修改時間:
*/
@EnableBinding(value = {Sink.class})
public class SinkReceiver {
private static final Logger log = LoggerFactory.getLogger(SinkReceiver.class);
/**
* 方法名:
* 功能:將被修飾的方法註冊到訊息中介軟體上的資料流事件監聽器中
* 描述:
* 建立人:typ
* 建立時間:2018/9/25 16:36
* 修改人:
* 修改描述:
* 修改時間:
*/
@StreamListener(Sink.INPUT)
public void receive(String str){
log.info("Received:" + str);
}
}
傳送訊息:
控制檯資料資訊:
2018-09-25 16:46:10.749 INFO 2564 --- [Ye2ZeeV2sRHjA-1] c.e.demo.stream.service.SinkReceiver : Received:Hello World!
2、建立生產者(stream-source)
pom.xml依賴檔案:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.demo.stream</groupId>
<artifactId>stream-source</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>stream-source</name>
<parent>
<groupId>com.example.demo</groupId>
<artifactId>springcloud-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml配置檔案:
server:
port: 8082
spring:
application:
name: stream-source
rabbitmq:
host: 192.168.0.132
port: 5672
username: admin
password: admin
cloud:
stream:
binders:
input:
destination: input
建立生產者例項Sender.java 檔案:
package com.example.demo.stream.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 路徑:com.example.demo.stream.service
* 類名:
* 功能:生產者
* 備註:
* 建立人:typ
* 建立時間:2018/9/25 15:57
* 修改人:
* 修改備註:
* 修改時間:
*/
@EnableBinding(Source.class)
public class Sender {
private static final Logger log = LoggerFactory.getLogger(Sender.class);
@InboundChannelAdapter(value = Source.OUTPUT,poller = @Poller(fixedDelay = "2000"))
public String send(){
String str = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
log.info("send message:"+str);
return str;
}
}
啟動工程: