1. 程式人生 > >Spring Cloud Stream Elmhurst SR1 翻譯

Spring Cloud Stream Elmhurst SR1 翻譯

Spring Cloud Stream Reference Guide

Table of Contents

Spring Cloud Stream Core

1. Quick Start

You can try Spring Cloud Stream in less then 5 min even before you jump into any details by following this three-step guide.

 

您可以在不到5分鐘的時間內嘗試Spring Cloud Stream,甚至在您按照這個三步指南跳轉到任何細節之前。

 

We show you how to create a Spring Cloud Stream application that receives messages coming from the messaging middleware of your choice (more on this later) and logs received messages to the console. We call it LoggingConsumer. While not very practical, it provides a good introduction to some of the main concepts and abstractions, making it easier to digest the rest of this user guide.

 

我們將向您展示如何建立一個Spring Cloud Stream應用程式,該應用程式接收來自您選擇的訊息傳遞中介軟體的訊息(稍後將詳細介紹)並將收到的訊息記錄到控制檯。我們稱之為LoggingConsumer。雖然不太實用,但它提供了一些主要概念和抽象的良好介紹,使其更容易消化本使用者指南的其餘部分。

 

The three steps are as follows:

  1. Creating a Sample Application by Using Spring Initializr
  2. Importing the Project into Your IDE
  3. Adding a Message Handler, Building, and Running

 

這三個步驟如下:

  1. 使用Spring Initializr建立示例應用程式
  2. 將專案匯入IDE
  3. 新增訊息處理程式,構建和執行

 

1.1. Creating a Sample Application by Using Spring Initializr

 

To get started, visit the Spring Initializr. From there, you can generate our LoggingConsumer application. To do so:

  1. In the Dependencies section, start typing stream. When the “Cloud Stream” option should appears, select it.
  2. Start typing either 'kafka' or 'rabbit'.
  3. Select “Kafka” or “RabbitMQ”.
    Basically, you choose the messaging middleware to which your application binds. We recommend using the one you have already installed or feel more comfortable with installing and running. Also, as you can see from the Initilaizer screen, there are a few other options you can choose. For example, you can choose Gradle as your build tool instead of Maven (the default).
  4. In the Artifact field, type 'logging-consumer'.
    The value of the Artifact field becomes the application name. If you chose RabbitMQ for the middleware, your Spring Initializr should now be as follows:

  1. Click the Generate Project button.
    Doing so downloads the zipped version of the generated project to your hard drive.
  2. Unzip the file into the folder you want to use as your project directory.

 

要開始使用,請訪問Spring Initializr。從那裡,您可以生成我們的LoggingConsumer應用程式。為此:

  1. 在“ 依賴關係部分中,開始鍵入stream。當出現“Cloud Stream”選項時,選擇它。
  2. 開始輸入'kafka'或'rabbit'。
  3. 選擇“Kafka”或“RabbitMQ”。

基本上,您選擇應用程式繫結的訊息傳遞中介軟體。我們建議您使用已安裝的或安裝和執行時感覺更舒適。此外,從Initilaizer螢幕中可以看到,您可以選擇其他一些選項。例如,您可以選擇Gradle作為構建工具而不是Maven(預設值)。

  1. 在“ 工件欄位中,鍵入“logging-consumer”。

Artifact欄位的值成為應用程式名稱。如果你選擇RabbitMQ作為中介軟體,你的Spring Initializr現在應該如下:

  1. 單擊“ 生成專案按鈕。

這樣做會將生成的專案的壓縮版本下載到硬碟驅動器。

  1. 將檔案解壓縮到要用作專案目錄的資料夾中。

 

We encourage you to explore the many possibilities available in the Spring Initializr. It lets you create many different kinds of Spring applications.

我們鼓勵您探索Spring Initializr中的許多可能性。它允許您建立許多不同型別的Spring應用程式。

 

1.2. Importing the Project into Your IDE   將專案匯入IDE

 

Now you can import the project into your IDE. Keep in mind that, depending on the IDE, you may need to follow a specific import procedure. For example, depending on how the project was generated (Maven or Gradle), you may need to follow specific import procedure (for example, in Eclipse or STS, you need to use File → Import → Maven → Existing Maven Project).

現在,您可以將專案匯入IDE。請記住,根據IDE,您可能需要遵循特定的匯入過程。例如,根據專案的生成方式(Maven或Gradle),您可能需要遵循特定的匯入過程(例如,在Eclipse或STS中,您需要使用File→Import→Maven→Existing Maven Project)。

 

Once imported, the project must have no errors of any kind. Also, src/main/java should contain com.example.loggingconsumer.LoggingConsumerApplication.

匯入後,專案必須沒有任何錯誤。另外,src/main/java應該包含com.example.loggingconsumer.LoggingConsumerApplication。

 

Technically, at this point, you can run the application’s main class. It is already a valid Spring Boot application. However, it does not do anything, so we want to add some code.

從技術上講,此時,您可以執行應用程式的主類。它已經是一個有效的Spring Boot應用程式。但是,它沒有做任何事情,所以我們想新增一些程式碼。

 

1.3. Adding a Message Handler, Building, and Running   新增訊息處理器,構建,並執行

 

Modify the com.example.loggingconsumer.LoggingConsumerApplication class to look as follows:

將com.example.loggingconsumer.LoggingConsumerApplication類修改為如下所示:

 

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

public static void main(String[] args) {
                SpringApplication.run(LoggingConsumerApplication.class, args);
        }

@StreamListener(Sink.INPUT)
        public void handle(Person person) {
                System.out.println("Received: " + person);
        }

public static class Person {
                private String name;
                public String getName() {
                        return name;
                }
                public void setName(String name) {
                        this.name = name;
                }
                public String toString() {
                        return this.name;
                }
        }
}

 

As you can see from the preceding listing:

  • We have enabled Sink binding (input-no-output) by using @EnableBinding(Sink.class). Doing so signals to the framework to initiate binding to the messaging middleware, where it automatically creates the destination (that is, queue, topic, and others) that are bound to the Sink.INPUT channel.
  • We have added a handler method to receive incoming messages of type Person. Doing so lets you see one of the core features of the framework: It tries to automatically convert incoming message payloads to type Person.

 

從前面的清單中可以看出:

  • 我們Sink通過使用啟用了繫結(輸入 - 無輸出)@EnableBinding(Sink.class)。這樣做會向框架發出訊號,以啟動與訊息傳遞中介軟體的繫結,從而自動建立繫結到Sink.INPUT通道的目標(即佇列,主題和其他)。
  • 我們添加了一種handler方法來接收型別的傳入訊息Person。這樣做可以讓您看到框架的核心功能之一:它嘗試自動將傳入的訊息有效負載轉換為型別Person。

 

You now have a fully functional Spring Cloud Stream application that does listens for messages. From here, for simplicity, we assume you selected RabbitMQ in step one. Assuming you have RabbitMQ installed and running, you can start the application by running its main method in your IDE.

您現在擁有一個功能齊全的Spring Cloud Stream應用程式,可以偵聽訊息。從這裡開始,為簡單起見,我們假設您在第一步中選擇了RabbitMQ 。假設您已安裝並執行RabbitMQ,則可以通過main在IDE中執行其方法來啟動應用程式。

 

You should see following output:

你應該看到以下輸出:

 

        --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
        --- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
        --- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/[email protected] . .
        . . .
        --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
        . . .
        --- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

 

Go to the RabbitMQ management console or any other RabbitMQ client and send a message to input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg. The anonymous.CbMIwdkJSBO1ZoPDOtHtCg part represents the group name and is generated, so it is bound to be different in your environment. For something more predictable, you can use an explicit group name by setting spring.cloud.stream.bindings.input.group=hello (or whatever name you like).

轉到RabbitMQ管理控制檯或任何其他RabbitMQ客戶端併發送訊息input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg。該anonymous.CbMIwdkJSBO1ZoPDOtHtCg部分代表組名稱並生成,因此它在您的環境中必然會有所不同。對於更可預測的內容,您可以通過設定spring.cloud.stream.bindings.input.group=hello(或任何您喜歡的名稱)來使用顯式組名。

 

The contents of the message should be a JSON representation of the Person class, as follows:

訊息的內容應該是類的JSON表示Person,如下所示:

 

{"name":"Sam Spade"}

 

Then, in your console, you should see:

然後,在您的控制檯中,您應該看到:

 

Received: Sam Spade

 

You can also build and package your application into a boot jar (by using ./mvnw clean install) and run the built JAR by using the java -jar command.

您還可以將應用程式構建並打包到引導jar中(通過使用./mvnw clean install),並使用該java -jar命令執行構建的JAR 。

 

Now you have a working (albeit very basic) Spring Cloud Stream application.

現在您有一個工作(儘管非常基本的)Spring Cloud Stream應用程式。

 

2. What’s New in 2.0?

 

Spring Cloud Stream introduces a number of new features, enhancements, and changes. The following sections outline the most notable ones:

 

Spring Cloud Stream引入了許多新功能,增強功能和更改。以下部分概述了最值得注意的部分:

 

2.1. New Features and Components   新功能和元件

 

  • Polling Consumers: Introduction of polled consumers, which lets the application control message processing rates. See “Using Polled Consumers” for more details. You can also read this blog post for more details.
  • Micrometer Support: Metrics has been switched to use Micrometer. MeterRegistry is also provided as a bean so that custom applications can autowire it to capture custom metrics. See “Metrics Emitter” for more details.
  • New Actuator Binding Controls: New actuator binding controls let you both visualize and control the Bindings lifecycle. For more details, see Binding visualization and control.
  • Configurable RetryTemplate: Aside from providing properties to configure RetryTemplate, we now let you provide your own template, effectively overriding the one provided by the framework. To use it, configure it as a @Bean in your application.

 

  • 輪詢消費者:引入輪詢的消費者,讓應用程式控制訊息處理速率。有關詳細資訊,請參閱“ 使用輪詢的使用者 ”。您還可以閱讀此部落格文章瞭解更多詳情。
  • 千分尺支援:度量標準已切換為使用千分尺。 MeterRegistry也作為bean提供,以便自定義應用程式可以自動裝配它以捕獲自定義指標。有關詳細資訊,請參閱“ 度量標準發射器 ”。
  • 新的執行器繫結控制元件:新的執行器繫結控制元件可讓您視覺化和控制Bindings生命週期。有關更多詳細資訊,請參閱繫結視覺化和控制元件
  • 可配置的RetryTemplate:除了提供要配置的屬性之外RetryTemplate,我們現在允許您提供自己的模板,有效地覆蓋框架提供的模板。要使用它,請@Bean在應用程式中將其配置為a 。

 

2.2. Notable Enhancements   值得注意的增強功能

 

This version includes the following notable enhancements:

 

此版本包括以下顯著增強功能:

 

2.2.1. Both Actuator and Web Dependencies Are Now Optional

 

This change slims down the footprint of the deployed application in the event neither actuator nor web dependencies required. It also lets you switch between the reactive and conventional web paradigms by manually adding one of the following dependencies.

如果不需要執行器或Web依賴性,則此更改會減少已部署應用程式的佔用空間。它還允許您通過手動新增以下依賴項之一在響應和傳統Web範例之間切換。

 

The following listing shows how to add the conventional web framework:

以下清單顯示瞭如何新增傳統的Web框架:

 

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
</dependency>

 

The following listing shows how to add the reactive web framework:

以下清單顯示瞭如何新增響應式Web框架:

 

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

 

The following list shows how to add the actuator dependency:

以下列表顯示瞭如何新增執行器依賴項:

 

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

 

2.2.2. Content-type Negotiation Improvements   內容型別協商改進

 

One of the core themes for verion 2.0 is improvements (in both consistency and performance) around content-type negotiation and message conversion. The following summary outlines the notable changes and improvements in this area. See the “Content Type Negotiation” section for more details. Also this blog post contains more detail.

  • All message conversion is now handled only by MessageConverter objects.
  • We introduced the @StreamMessageConverter annotation to provide custom MessageConverter objects.
  • We introduced the default Content Type as application/json, which needs to be taken into consideration when migrating 1.3 application or operating in the mixed mode (that is, 1.3 producer → 2.0 consumer).
  • Messages with textual payloads and a contentType of text/…​ or …​/json are no longer converted to Message<String> for cases where the argument type of the provided MessageHandler can not be determined (that is, public void handle(Message<?> message) or public void handle(Object payload)). Furthermore, a strong argument type may not be enough to properly convert messages, so the contentTypeheader may be used as a supplement by some MessageConverters.

 

verion 2.0的核心主題之一是圍繞內容型別協商和訊息轉換的改進(在一致性和效能方面)。以下摘要概述了該領域的顯著變化和改進。有關詳細資訊,請參閱“ 內容型別協商 ”部分。另外這個部落格帖子中包含更多細節。

  • 現在,所有訊息轉換由MessageConverter物件處理。
  • 我們引入了@StreamMessageConverter註釋來提供自定義MessageConverter物件。
  • 我們引入了預設值Content Typeas application/json,在遷移1.3應用程式或在混合模式下執行時需要考慮這一點(即1.3生產者→2.0使用者)。
  • 與文字訊息的有效載荷和contentType的text/…​或…​/json不再轉換為Message<String>對於其中提供的引數型別的情況下MessageHandler不能確定(即,public void handle(Message<?> message)或public void handle(Object payload))。此外,強大的引數型別可能不足以正確轉換訊息,因此contentType標題可能被某些人用作補充MessageConverters。

 

2.3. Notable Deprecations   值得注意的廢棄

 

As of version 2.0, the following items have been deprecated:

 

從2.0版開始,不推薦使用以下專案:

 

2.3.1. Java Serialization (Java Native and Kryo)   Java序列化()

 

JavaSerializationMessageConverter and KryoMessageConverter remain for now. However, we plan to move them out of the core packages and support in the future. The main reason for this deprecation is to flag the issue that type-based, language-specific serialization could cause in distributed environments, where Producers and Consumers may depend on different JVM versions or have different versions of supporting libraries (that is, Kryo). We also wanted to draw the attention to the fact that Consumers and Producers may not even be Java-based, so polyglot style serialization (i.e., JSON) is better suited.

 

JavaSerializationMessageConverter並KryoMessageConverter保持現在。但是,我們計劃在未來將它們從核心軟體包和支援中移除。這種棄用的主要原因是標記基於型別的,特定於語言的序列化可能在分散式環境中引起的問題,其中生產者和消費者可能依賴於不同的JVM版本或具有不同版本的支援庫(即Kryo)。我們還想提請注意消費者和生產者甚至可能不是基於Java的事實,因此多語言樣式序列化(即JSON)更適合。

 

2.3.2. Deprecated Classes and Methods   不推薦使用的類和方法

 

The following is a quick summary of notable deprecations. See the corresponding {spring-cloud-stream-javadoc-current}[javadoc] for more details.

  • SharedChannelRegistry. Use SharedBindingTargetRegistry.
  • Bindings. Beans qualified by it are already uniquely identified by their type — for example, provided Source, Processor, or custom bindings:

public interface Sample {
        String OUTPUT = "sampleOutput";

@Output(Sample.OUTPUT)
        MessageChannel output();
}

  • HeaderMode.raw. Use none, headers or embeddedHeaders
  • ProducerProperties.partitionKeyExtractorClass in favor of partitionKeyExtractorName and ProducerProperties.partitionSelectorClass in favor of partitionSelectorName. This change ensures that both components are Spring configured and managed and are referenced in a Spring-friendly way.
  • BinderAwareRouterBeanPostProcessor. While the component remains, it is no longer a BeanPostProcessorand will be renamed in the future.
  • BinderProperties.setEnvironment(Properties environment). Use BinderProperties.setEnvironment(Map<String, Object> environment).

 

以下是顯著棄用的快速摘要。有關更多詳細資訊,請參閱相應的{spring-cloud-stream-javadoc-current} [javadoc]。

  • SharedChannelRegistry。使用SharedBindingTargetRegistry。
  • Bindings。通過它合格豆已經通過獨特的型別識別-例如,提供Source,Processor或自定義繫結:

public interface Sample {

    String OUTPUT =“sampleOutput”;

 

    @Output(Sample.OUTPUT)

    MessageChannel輸出();

}

  • HeaderMode.raw。使用none,headers或embeddedHeaders
  • ProducerProperties.partitionKeyExtractorClass支援partitionKeyExtractorName和ProducerProperties.partitionSelectorClass贊成partitionSelectorName。此更改確保兩個元件都是Spring配置和管理的,並以Spring友好的方式引用。
  • BinderAwareRouterBeanPostProcessor。雖然該元件仍然存在,但它不再是一個元件,BeanPostProcessor並且將來會重新命名。
  • BinderProperties.setEnvironment(Properties environment)。使用BinderProperties.setEnvironment(Map<String, Object> environment)。

 

This section goes into more detail about how you can work with Spring Cloud Stream. It covers topics such as creating and running stream applications.

本節詳細介紹瞭如何使用Spring Cloud Stream。它涵蓋了建立和執行流應用程式等主題。

 

3. Introducing Spring Cloud Stream   介紹Spring Cloud Stream

 

Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.

 

Spring Cloud Stream是一個用於構建訊息驅動的微服務應用程式的框架。Spring Cloud Stream構建於Spring Boot之上,用於建立獨立的生產級Spring應用程式,並使用Spring Integration提供與訊息代理的連線。它提供了來自多個供應商的中介軟體的固定配置,介紹了持久性發布 - 訂閱語義,消費者組,以及分割槽的概念。

 

You can add the @EnableBinding annotation to your application to get immediate connectivity to a message broker, and you can add @StreamListener to a method to cause it to receive events for stream processing. The following example shows a sink application that receives external messages:

 

您可以將@EnableBinding註解新增到應用程式以立即連線到訊息代理,並且可以將@StreamListener註解新增到方法以使其接收流處理事件。以下示例顯示了接收外部訊息的接收器應用程式:

 

@SpringBootApplication

@EnableBinding(Sink.class)

public class VoteRecordingSinkApplication {

 

  public static void main(String[] args) {

    SpringApplication.run(VoteRecordingSinkApplication.class, args);

  }

 

  @StreamListener(Sink.INPUT)

  public void processVote(Vote vote) {

      votingService.recordVote(vote);

  }

}

 

The @EnableBinding annotation takes one or more interfaces as parameters (in this case, the parameter is a single Sink interface). An interface declares input and output channels. Spring Cloud Stream provides the Source, Sink, and Processor interfaces. You can also define your own interfaces.

 

@EnableBinding註解接收一個或多個介面引數(在這種情況下,該引數是一個單個的Sink介面)。介面宣告輸入和輸出管道。Spring Cloud Stream提供了Source,Sink,和Processor介面。您還可以定義自己的介面。

 

The following listing shows the definition of the Sink interface:

 

下面顯示了Sink介面的定義:

 

public interface Sink {

  String INPUT = "input";

 

  @Input(Sink.INPUT)

  SubscribableChannel input();

}

 

The @Input annotation identifies an input channel, through which received messages enter the application. The @Output annotation identifies an output channel, through which published messages leave the application. The @Input and @Output annotations can take a channel name as a parameter. If a name is not provided, the name of the annotated method is used.

 

@Input註解標識一個輸入管道,通過它接收進入應用程式的訊息。@Output註解標識一個輸出通道,通過它釋出離開應用程式的訊息。@Input和@Output註解可以接收管道名稱作為引數。如果未提供名稱,則使用註解方法的名稱。

 

Spring Cloud Stream creates an implementation of the interface for you. You can use this in the application by autowiring it, as shown in the following example (from a test case):

 

Spring Cloud Stream為您建立了一個介面實現。您可以通過自動裝配在應用程式中使用它,如以下示例所示(來自測試用例):

 

@RunWith(SpringJUnit4ClassRunner.class)

@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)

@WebAppConfiguration

@DirtiesContext

public class StreamApplicationTests {

 

  @Autowired

  private Sink sink;

 

  @Test

  public void contextLoads() {

    assertNotNull(this.sink.input());

  }

}

 

4. Main Concepts   主要概念

 

Spring Cloud Stream provides a number of abstractions and primitives that simplify the writing of message-driven microservice applications. This section gives an overview of the following:

 

Spring Cloud Stream提供了許多抽象和原語,簡化了訊息驅動的微服務應用程式的編寫。本節概述了以下內容:

 

 

4.1. Application Model   應用程式模型

 

A Spring Cloud Stream application consists of a middleware-neutral core. The application communicates with the outside world through input and output channels injected into it by Spring Cloud Stream. Channels are connected to external brokers through middleware-specific Binder implementations.

 

Spring Cloud Stream應用程式由中介軟體中立的核心組成。應用程式通過Spring Cloud Stream注入其中的輸入和輸出管道與外界通訊。通過中介軟體特定的Binder實現,將管道連線到外部代理。

 

Figure 1. Spring Cloud Stream Application

 

4.1.1. Fat JAR   胖JAR

 

Spring Cloud Stream applications can be run in stand-alone mode from your IDE for testing. To run a Spring Cloud Stream application in production, you can create an executable (or “fat”) JAR by using the standard Spring Boot tooling provided for Maven or Gradle. See the Spring Boot Reference Guide for more details.

 

Spring Cloud Stream應用程式可以在IDE中以獨立模式執行以進行測試。要在生產中執行Spring Cloud Stream應用程式,可以使用為Maven或Gradle提供的標準Spring Boot工具建立可執行(或“胖”)JAR。有關更多詳細資訊,請參見Spring Boot Reference Guide

 

4.2. The Binder Abstraction   Binder抽象

 

Spring Cloud Stream provides Binder implementations for Kafka and Rabbit MQ. Spring Cloud Stream also includes a TestSupportBinder, which leaves a channel unmodified so that tests can interact with channels directly and reliably assert on what is received. You can also use the extensible API to write your own Binder.

 

Spring Cloud Stream為KafkaRabbit MQ提供了Binder實現。Spring Cloud Stream還包含一個TestSupportBinder,它保留了一個未修改的管道,以便測試可以直接與管道互動,並可靠地斷言收到的內容。您還可以使用可擴充套件API編寫自己的Binder。

 

Spring Cloud Stream uses Spring Boot for configuration, and the Binder abstraction makes it possible for a Spring Cloud Stream application to be flexible in how it connects to middleware. For example, deployers can dynamically choose, at runtime, the destinations (such as the Kafka topics or RabbitMQ exchanges) to which channels connect. Such configuration can be provided through external configuration properties and in any form supported by Spring Boot (including application arguments, environment variables, and application.yml or application.properties files). In the sink example from the Introducing Spring Cloud Stream section, setting the spring.cloud.stream.bindings.input.destination application property to raw-sensor-data causes it to read from the raw-sensor-data Kafka topic or from a queue bound to the raw-sensor-data RabbitMQ exchange.

 

Spring Cloud Stream使用Spring Boot進行配置,Binder抽象使Spring Cloud Stream應用程式可以靈活地連線到中介軟體。例如,部署者可以在執行時動態選擇管道連線的目的地(例如Kafka主題或RabbitMQ交換)。可以通過外部配置屬性以及Spring Boot支援的任何形式(包括應用程式引數,環境變數,和application.yml或application.properties檔案)來提供此類配置。在Introducing Spring Cloud Stream部分的接收器示例中,將spring.cloud.stream.bindings.input.destination應用程式屬性設定為raw-sensor-data以使其從raw-sensor-data Kafka主題或繫結到raw-sensor-data RabbitMQ交換的佇列中讀取。

 

Spring Cloud Stream automatically detects and uses a binder found on the classpath. You can use different types of middleware with the same code. To do so, include a different binder at build time. For more complex use cases, you can also package multiple binders with your application and have it choose the binder( and even whether to use different binders for different channels) at runtime.

 

Spring Cloud Stream自動檢測並使用類路徑中找到的繫結器。您可以使用具有相同程式碼的不同型別的中介軟體。為此,請在構建時包含不同的繫結器。對於更復雜的用例,您還可以在應用程式中打包多個繫結器,並讓它在執行時選擇繫結器(甚至是否為不同的通道使用不同的繫結器)。

 

4.3. Persistent Publish-Subscribe Support   持久化釋出-訂閱支援

 

Communication between applications follows a publish-subscribe model, where data is broadcast through shared topics. This can be seen in the following figure, which shows a typical deployment for a set of interacting Spring Cloud Stream applications.

 

應用程式之間的通訊遵循釋出 - 訂閱模型,其中資料通過共享主題廣播。這可以在下圖中看到,該圖顯示了一組互動式Spring Cloud Stream應用程式的典型部署。

 

Figure 2. Spring Cloud Stream Publish-Subscribe

 

Data reported by sensors to an HTTP endpoint is sent to a common destination named raw-sensor-data. From the destination, it is independently processed by a microservice application that computes time-windowed averages and by another microservice application that ingests the raw data into HDFS (Hadoop Distributed File System). In order to process the data, both applications declare the topic as their input at runtime.

 

感測器向HTTP端點報告的資料將傳送到名為raw-sensor-data的公共目的地。從目的地開始,它由一個計算時間窗平均值的微服務應用程式和另一個將原始資料攝入HDFS(Hadoop分散式檔案系統)的微服務應用程式單獨處理。為了處理資料,兩個應用程式都將主題宣告為執行時的輸入。

 

The publish-subscribe communication model reduces the complexity of both the producer and the consumer and lets new applications be added to the topology without disruption of the existing flow. For example, downstream from the average-calculating application, you can add an application that calculates the highest temperature values for display and monitoring. You can then add another application that interprets the same flow of averages for fault detection. Doing all communication through shared topics rather than point-to-point queues reduces coupling between microservices.

 

釋出 - 訂閱通訊模型降低了生產者和消費者的複雜性,並允許將新應用程式新增到拓撲中,而不會中斷現有流程。例如,在平均值計算應用程式的下游,您可以新增計算顯示和監視的最高溫度值的應用程式。然後,您可以新增另一個應用程式來解釋相同的平均流量以進行故障檢測。通過共享主題而不是點對點佇列進行所有通訊可以減少微服務之間的耦合。

 

While the concept of publish-subscribe messaging is not new, Spring Cloud Stream takes the extra step of making it an opinionated choice for its application model. By using native middleware support, Spring Cloud Stream also simplifies use of the publish-subscribe model across different platforms.

 

雖然釋出 - 訂閱訊息的概念並不新鮮,但Spring Cloud Stream採取了額外的步驟,使其成為其應用程式模型的自覺選擇。通過使用原生中介軟體支援,Spring Cloud Stream還簡化了跨不同平臺的釋出 - 訂閱模型的使用。

 

4.4. Consumer Groups   消費者組

 

While the publish-subscribe model makes it easy to connect applications through shared topics, the ability to scale up by creating multiple instances of a given application is equally important. When doing so, different instances of an application are placed in a competing consumer relationship, where only one of the instances is expected to handle a given message.

 

雖然釋出 - 訂閱模型使通過共享主題輕鬆連線應用程式,但通過建立給定應用程式的多個例項來擴充套件的能力同樣重要。執行此操作時,應用程式的不同例項將放置在競爭的消費者關係中,其中只有一個例項需要處理給定的訊息。

 

Spring Cloud Stream models this behavior through the concept of a consumer group. (Spring Cloud Stream consumer groups are similar to and inspired by Kafka consumer groups.) Each consumer binding can use the spring.cloud.stream.bindings.<channelName>.group property to specify a group name. For the consumers shown in the following figure, this property would be set as spring.cloud.stream.bindings.<channelName>.group=hdfsWrite or spring.cloud.stream.bindings.<channelName>.group=average.

 

Spring Cloud Stream通過消費者組的概念對此行為進行建模。(Spring Cloud Stream消費者組與Kafka消費者組類似並受其啟發。)每個消費者繫結都可以使用spring.cloud.stream.bindings.<channelName>.group屬性來指定組名稱。對於下圖中顯示的消費者,此屬性將設定為spring.cloud.stream.bindings.<channelName>.group=hdfsWrite或spring.cloud.stream.bindings.<channelName>.group=average。

 

Figure 3. Spring Cloud Stream Consumer Groups

 

All groups that subscribe to a given destination receive a copy of published data, but only one member of each group receives a given message from that destination. By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups.

 

訂閱給定目的地的所有組都會收到已釋出資料的副本,但每個組中只有一個成員從該目的地接收給定的訊息。預設情況下,當未指定組時,Spring Cloud Stream會將應用程式分配給與所有其他消費者組處於釋出 - 訂閱關係的一個匿名且獨立的單成員消費者組。

 

4.5. Consumer Types   消費者型別

 

Two types of consumer are supported:

  • Message-driven (sometimes referred to as Asynchronous)
  • Polled (sometimes referred to as Synchronous)

 

支援兩種型別的消費者:

  • 訊息驅動(有時稱為非同步)
  • 輪詢(有時稱為同步)

 

Prior to version 2.0, only asynchronous consumers were supported. A message is delivered as soon as it is available and a thread is available to process it.

When you wish to control the rate at which messages are processed, you might want to use a synchronous consumer.

 

在2.0版之前,僅支援非同步消費者。訊息一旦可用就會傳遞,並且有一個執行緒可以處理它。

如果要控制處理訊息的速率,可能需要使用同步消費者。

 

4.5.1. Durability   永續性

 

Consistent with the opinionated application model of Spring Cloud Stream, consumer group subscriptions are durable. That is, a binder implementation ensures that group subscriptions are persistent and that, once at least one subscription for a group has been created, the group receives messages, even if they are sent while all applications in the group are stopped.

 

與Spring Cloud Stream的固定應用模型一致,消費者組訂閱是持久的。也就是說,繫結器實現確保組訂閱是持久的,並且一旦建立了組的至少一個訂閱,該組就接收訊息,即使它們是在組中的所有應用程式都被停止時傳送的。

 

Anonymous subscriptions are non-durable by nature. For some binder implementations (such as RabbitMQ), it is possible to have non-durable group subscriptions.

匿名訂閱本質上是非持久的。對於某些繫結器實現(例如RabbitMQ),可以具有非持久的組訂閱。

 

In general, it is preferable to always specify a consumer group when binding an application to a given destination. When scaling up a Spring Cloud Stream application, you must specify a consumer group for each of its input bindings. Doing so prevents the application’s instances from receiving duplicate messages (unless that behavior is desired, which is unusual).

 

通常,在將應用程式繫結到給定目的地時,最好始終指定消費者組。擴充套件Spring Cloud Stream應用程式時,必須為每個輸入繫結指定一個消費者組。這樣做可以防止應用程式的例項接收重複的訊息(除非需要這種行為,這是不正常的)。

 

4.6. Partitioning Support   分割槽支援