1. 程式人生 > >Spring Cloud Stream 測試與入門

Spring Cloud Stream 測試與入門

測試

Spring Cloud Stream支援測試您的微服務應用程式,而無需連線到訊息系統。您可以使用spring-cloud-stream-test-support庫提供的TestSupportBinder,可以將其作為測試依賴項新增到應用程式中:

   <dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-stream-test-support</artifactId>
       <scope>
test</scope> </dependency>
注意 TestSupportBinder使用Spring Boot自動配置機制取代類路徑中找到的其他繫結。因此,新增binder作為依賴關係時,請確保正在使用test範圍。

TestSupportBinder允許使用者與繫結的頻道進行互動,並檢查應用程式傳送和接收的訊息

對於出站訊息通道,TestSupportBinder註冊單個訂戶,並將應用程式傳送的訊息保留在MessageCollector中。它們可以在測試過程中被檢索,並對它們做出斷言。

使用者還可以將訊息傳送到入站訊息通道,以便消費者應用程式可以使用訊息。以下示例顯示瞭如何在處理器上測試輸入和輸出通道。

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ExampleTest {

  @Autowired
  private Processor processor;

  @Autowired
  private MessageCollector messageCollector;

  @Test
  @SuppressWarnings("unchecked")
  public void testWiring
() { Message<String> message = new GenericMessage<>("hello"); processor.input().send(message); Message<String> received = (Message<String>) messageCollector.forChannel(processor.output()).poll(); assertThat(received.getPayload(), equalTo("hello world")); } @SpringBootApplication @EnableBinding(Processor.class) public static class MyProcessor { @Autowired private Processor channels; @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public String transform(String in) { return in + " world"; } } }

在上面的示例中,我們正在建立一個具有輸入和輸出通道的應用程式,通過Processor介面繫結。繫結的介面被注入測試,所以我們可以訪問這兩個通道。我們正在輸入頻道傳送訊息,我們使用Spring Cloud Stream測試支援提供的MessageCollector來捕獲訊息已經被髮送到輸出通道。收到訊息後,我們可以驗證元件是否正常工作。

健康指標

Spring Cloud Stream為粘合劑提供健康指標。它以binders的名義註冊,可以通過設定management.health.binders.enabled屬性啟用或禁用。

指標發射器

Spring Cloud Stream提供了一個名為spring-cloud-stream-metrics的模組,可以用來從Spring Boot度量端點到命名通道發出任何可用度量。該模組允許運營商從流應用收集指標,而不依賴輪詢其端點。

當您設定度量繫結的目標名稱(例如spring.cloud.stream.bindings.applicationMetrics.destination=<DESTINATION_NAME>)時,該模組將被啟用。可以以與任何其他生成器繫結相似的方式配置applicationMetricsapplicationMetricscontentType預設設定為application/json

以下屬性可用於自定義度量標準的排放:

spring.cloud.stream.metrics.key

要發射的度量的名稱。應該是每個應用程式的唯一值。

預設

KaTeX parse error: Expected '}', got 'EOF' at end of input: …plication.name:{vcap.application.name?{spring.config.name:application}}}

spring.cloud.stream.metrics.prefix

字首字串,以字首到度量鍵。

預設值:``

spring.cloud.stream.metrics.properties

就像includes選項一樣,它允許將白名單應用程式屬性新增到度量有效負載

預設值:null。

有關度量匯出過程的詳細概述,請參見Spring Boot參考文件。Spring Cloud Stream提供了一個名為application的指標匯出器,可以通過常規Spring Boot指標配置屬性進行配置

可以通過使用出口商的全域性Spring Boot配置設定或使用特定於匯出器的屬性來配置匯出器。要使用全域性配置設定,屬性應以spring.metric.export為字首(例如spring.metric.export.includes=integration**)。這些配置選項將適用於所有出口商(除非它們的配置不同)。或者,如果要使用與其他出口商不同的配置設定(例如,限制釋出的度量數量),則可以使用字首spring.metrics.export.triggers.application配置Spring Cloud Stream提供的度量匯出器(例如spring.metrics.export.triggers.application.includes=integration**)。

注意 由於Spring Boot的輕鬆約束,所包含的屬性的值可能與原始值稍有不同。作為經驗法則,度量匯出器將嘗試使用點符號(例如JAVA_HOME成為java.home)以一致的格式標準化所有屬性。規範化的目標是使下游使用者能夠始終如一地接收屬性名稱,無論它們如何設定在受監視的應用程式上(–spring.application.nameSPRING_APPLICATION_NAME始終會生成)。

以下是通過以下命令以JSON格式釋出到頻道的資料的示例:

java -jar time-source.jar \
    --spring.cloud.stream.bindings.applicationMetrics.destination=someMetrics \
    --spring.cloud.stream.metrics.properties=spring.application** \
    --spring.metrics.export.includes=integration.channel.input**,integration.channel.output**

得到的JSON是:

{
   "name":"time-source",
   "metrics":[
      {
         "name":"integration.channel.output.errorRate.mean",
         "value":0.0,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.errorRate.max",
         "value":0.0,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.errorRate.min",
         "value":0.0,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.errorRate.stdev",
         "value":0.0,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.errorRate.count",
         "value":0.0,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.sendCount",
         "value":6.0,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.sendRate.mean",
         "value":0.994885872292989,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.sendRate.max",
         "value":1.006247080013156,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.sendRate.min",
         "value":1.0012035220116378,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.sendRate.stdev",
         "value":6.505181111084848E-4,
         "timestamp":"2017-04-11T16:56:35.790Z"
      },
      {
         "name":"integration.channel.output.sendRate.count",
         "value":6.0,
         "timestamp":"2017-04-11T16:56:35.790Z"
      }
   ],
   "createdTime":"2017-04-11T20:56:35.790Z",
   "properties":{
      "spring.application.name":"time-source",
      "spring.application.index":"0"
   }
}

樣品

對於Spring Cloud Stream示例,請參閱GitHub上的spring-cloud-stream樣本儲存庫。

入門

要開始建立Spring Cloud Stream應用程式,請訪問Spring Initializr並建立一個名為“GreetingSource”的新Maven專案。在下拉選單中選擇Spring Boot {supported-spring-boot-version}。在“ 搜尋依賴關係”文字框中鍵入Stream RabbitStream Kafka,具體取決於您要使用的binder。

接下來,在與GreetingSourceApplication類相同的包中建立一個新類GreetingSource。給它以下程式碼:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;

@EnableBinding(Source.class)
public class GreetingSource {

    @InboundChannelAdapter(Source.OUTPUT)
    public String greet() {
        return "hello world " + System.currentTimeMillis();
    }
}

@EnableBinding註釋是觸發Spring Integration基礎架構元件的建立。具體來說,它將建立一個Kafka連線工廠,一個Kafka出站通道介面卡,並在Source介面中定義訊息通道:

public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();

}

自動配置還建立一個預設輪詢器,以便每秒呼叫greet()方法一次。標準的Spring Integration @InboundChannelAdapter註釋使用返回值作為訊息的有效內容向源的輸出通道傳送訊息。

要測試驅動此設定,請執行Kafka訊息代理。一個簡單的方法是使用Docker映象:

# On OS X
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka

# On Linux
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 spotify/kafka

構建應用程式:

./mvnw clean package

消費者應用程式以類似的方式進行編碼。返回Initializr並建立另一個名為LoggingSink的專案。然後在與類LoggingSinkApplication相同的包中建立一個新類LoggingSink,並使用以下程式碼:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
public class LoggingSink {

    @StreamListener(Sink.INPUT)
    public void log(String message) {
        System.out.println(message);
    }
}

構建應用程式:

./mvnw clean package

要將GreetingSource應用程式連線到LoggingSink應用程式,每個應用程式必須共享相同的目標名稱。啟動這兩個應用程式如下所示,您將看到消費者應用程式列印“hello world”和時間戳到控制檯:

cd GreetingSource
java -jar target/GreetingSource-0.0.1-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=mydest

cd LoggingSink
java -jar target/LoggingSink-0.0.1-SNAPSHOT.jar --server.port=8090 --spring.cloud.stream.bindings.input.destination=mydest

(不同的伺服器埠可以防止兩個應用程式中用於維護Spring Boot執行器端點的HTTP埠的衝突。)

LoggingSink應用程式的輸出將如下所示:

[           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8090 (http)
[           main] com.example.LoggingSinkApplication       : Started LoggingSinkApplication in 6.828 seconds (JVM running for 7.371)
hello world 1458595076731
hello world 1458595077732
hello world 1458595078733
hello world 1458595079734
hello world 1458595080735

歡迎關注作者的公眾號《Java程式設計生活》,每日記載Java程式猿工作中遇到的問題 在這裡插入圖片描述