Spring Cloud Stream 測試與入門
測試
Spring Cloud Stream支援測試您的微服務應用程式,而無需連線到訊息系統。您可以使用spring-cloud-stream-test-support庫提供的TestSupportBinder,可以將其作為測試依賴項新增到應用程式中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope> test</scope>
</dependency>
注意 | TestSupportBinder使用Spring Boot自動配置機制取代類路徑中找到的其他繫結。因此,新增binder作為依賴關係時,請確保正在使用test範圍。 |
---|
TestSupportBinder允許使用者與繫結的頻道進行互動,並檢查應用程式傳送和接收的訊息
對於出站訊息通道,TestSupportBinder註冊單個訂戶,並將應用程式傳送的訊息保留在MessageCollector中。它們可以在測試過程中被檢索,並對它們做出斷言。
使用者還可以將訊息傳送到入站訊息通道,以便消費者應用程式可以使用訊息。以下示例顯示瞭如何在處理器上測試輸入和輸出通道。
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ExampleTest {
@Autowired
private Processor processor;
@Autowired
private MessageCollector messageCollector;
@Test
@SuppressWarnings("unchecked")
public void testWiring () {
Message<String> message = new GenericMessage<>("hello");
processor.input().send(message);
Message<String> received = (Message<String>) messageCollector.forChannel(processor.output()).poll();
assertThat(received.getPayload(), equalTo("hello world"));
}
@SpringBootApplication
@EnableBinding(Processor.class)
public static class MyProcessor {
@Autowired
private Processor channels;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(String in) {
return in + " world";
}
}
}
在上面的示例中,我們正在建立一個具有輸入和輸出通道的應用程式,通過Processor介面繫結。繫結的介面被注入測試,所以我們可以訪問這兩個通道。我們正在輸入頻道傳送訊息,我們使用Spring Cloud Stream測試支援提供的MessageCollector來捕獲訊息已經被髮送到輸出通道。收到訊息後,我們可以驗證元件是否正常工作。
健康指標
Spring Cloud Stream為粘合劑提供健康指標。它以binders的名義註冊,可以通過設定management.health.binders.enabled屬性啟用或禁用。
指標發射器
Spring Cloud Stream提供了一個名為spring-cloud-stream-metrics的模組,可以用來從Spring Boot度量端點到命名通道發出任何可用度量。該模組允許運營商從流應用收集指標,而不依賴輪詢其端點。
當您設定度量繫結的目標名稱(例如spring.cloud.stream.bindings.applicationMetrics.destination=<DESTINATION_NAME>)時,該模組將被啟用。可以以與任何其他生成器繫結相似的方式配置applicationMetrics。applicationMetrics的contentType預設設定為application/json。
以下屬性可用於自定義度量標準的排放:
spring.cloud.stream.metrics.key
要發射的度量的名稱。應該是每個應用程式的唯一值。
預設
KaTeX parse error: Expected '}', got 'EOF' at end of input: …plication.name:{vcap.application.name?{spring.config.name:application}}}
spring.cloud.stream.metrics.prefix
字首字串,以字首到度量鍵。
預設值:``
spring.cloud.stream.metrics.properties
就像includes選項一樣,它允許將白名單應用程式屬性新增到度量有效負載
預設值:null。
有關度量匯出過程的詳細概述,請參見Spring Boot參考文件。Spring Cloud Stream提供了一個名為application的指標匯出器,可以通過常規Spring Boot指標配置屬性進行配置。
可以通過使用出口商的全域性Spring Boot配置設定或使用特定於匯出器的屬性來配置匯出器。要使用全域性配置設定,屬性應以spring.metric.export為字首(例如spring.metric.export.includes=integration**)。這些配置選項將適用於所有出口商(除非它們的配置不同)。或者,如果要使用與其他出口商不同的配置設定(例如,限制釋出的度量數量),則可以使用字首spring.metrics.export.triggers.application配置Spring Cloud Stream提供的度量匯出器(例如spring.metrics.export.triggers.application.includes=integration**)。
注意 | 由於Spring Boot的輕鬆約束,所包含的屬性的值可能與原始值稍有不同。作為經驗法則,度量匯出器將嘗試使用點符號(例如JAVA_HOME成為java.home)以一致的格式標準化所有屬性。規範化的目標是使下游使用者能夠始終如一地接收屬性名稱,無論它們如何設定在受監視的應用程式上(–spring.application.name或SPRING_APPLICATION_NAME始終會生成)。 |
---|
以下是通過以下命令以JSON格式釋出到頻道的資料的示例:
java -jar time-source.jar \
--spring.cloud.stream.bindings.applicationMetrics.destination=someMetrics \
--spring.cloud.stream.metrics.properties=spring.application** \
--spring.metrics.export.includes=integration.channel.input**,integration.channel.output**
得到的JSON是:
{
"name":"time-source",
"metrics":[
{
"name":"integration.channel.output.errorRate.mean",
"value":0.0,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.errorRate.max",
"value":0.0,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.errorRate.min",
"value":0.0,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.errorRate.stdev",
"value":0.0,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.errorRate.count",
"value":0.0,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.sendCount",
"value":6.0,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.sendRate.mean",
"value":0.994885872292989,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.sendRate.max",
"value":1.006247080013156,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.sendRate.min",
"value":1.0012035220116378,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.sendRate.stdev",
"value":6.505181111084848E-4,
"timestamp":"2017-04-11T16:56:35.790Z"
},
{
"name":"integration.channel.output.sendRate.count",
"value":6.0,
"timestamp":"2017-04-11T16:56:35.790Z"
}
],
"createdTime":"2017-04-11T20:56:35.790Z",
"properties":{
"spring.application.name":"time-source",
"spring.application.index":"0"
}
}
樣品
對於Spring Cloud Stream示例,請參閱GitHub上的spring-cloud-stream樣本儲存庫。
入門
要開始建立Spring Cloud Stream應用程式,請訪問Spring Initializr並建立一個名為“GreetingSource”的新Maven專案。在下拉選單中選擇Spring Boot {supported-spring-boot-version}。在“ 搜尋依賴關係”文字框中鍵入Stream Rabbit或Stream Kafka,具體取決於您要使用的binder。
接下來,在與GreetingSourceApplication類相同的包中建立一個新類GreetingSource。給它以下程式碼:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
@EnableBinding(Source.class)
public class GreetingSource {
@InboundChannelAdapter(Source.OUTPUT)
public String greet() {
return "hello world " + System.currentTimeMillis();
}
}
@EnableBinding註釋是觸發Spring Integration基礎架構元件的建立。具體來說,它將建立一個Kafka連線工廠,一個Kafka出站通道介面卡,並在Source介面中定義訊息通道:
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
自動配置還建立一個預設輪詢器,以便每秒呼叫greet()方法一次。標準的Spring Integration @InboundChannelAdapter註釋使用返回值作為訊息的有效內容向源的輸出通道傳送訊息。
要測試驅動此設定,請執行Kafka訊息代理。一個簡單的方法是使用Docker映象:
# On OS X
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka
# On Linux
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 spotify/kafka
構建應用程式:
./mvnw clean package
消費者應用程式以類似的方式進行編碼。返回Initializr並建立另一個名為LoggingSink的專案。然後在與類LoggingSinkApplication相同的包中建立一個新類LoggingSink,並使用以下程式碼:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
public class LoggingSink {
@StreamListener(Sink.INPUT)
public void log(String message) {
System.out.println(message);
}
}
構建應用程式:
./mvnw clean package
要將GreetingSource應用程式連線到LoggingSink應用程式,每個應用程式必須共享相同的目標名稱。啟動這兩個應用程式如下所示,您將看到消費者應用程式列印“hello world”和時間戳到控制檯:
cd GreetingSource
java -jar target/GreetingSource-0.0.1-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=mydest
cd LoggingSink
java -jar target/LoggingSink-0.0.1-SNAPSHOT.jar --server.port=8090 --spring.cloud.stream.bindings.input.destination=mydest
(不同的伺服器埠可以防止兩個應用程式中用於維護Spring Boot執行器端點的HTTP埠的衝突。)
LoggingSink應用程式的輸出將如下所示:
[ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8090 (http)
[ main] com.example.LoggingSinkApplication : Started LoggingSinkApplication in 6.828 seconds (JVM running for 7.371)
hello world 1458595076731
hello world 1458595077732
hello world 1458595078733
hello world 1458595079734
hello world 1458595080735
歡迎關注作者的公眾號《Java程式設計生活》,每日記載Java程式猿工作中遇到的問題