Spring Cloud Stream 配置選項
配置選項
Spring Cloud Stream支援常規配置選項以及繫結和繫結器的配置。一些繫結器允許額外的繫結屬性來支援中介軟體特定的功能。
可以通過Spring Boot支援的任何機制將配置選項提供給Spring Cloud Stream應用程式。這包括應用程式引數,環境變數和YAML或.properties檔案。
Spring Cloud Stream Properties
spring.cloud.stream.instanceCount
應用程式部署例項的數量。必須設定分割槽,如果使用Kafka。
預設值:1。
spring.cloud.stream.instanceIndex
應用程式的例項索引:從0
spring.cloud.stream.dynamicDestinations
可以動態繫結的目標列表(例如,在動態路由方案中)。如果設定,只能列出目的地。
預設值:空(允許任何目的地繫結)。
spring.cloud.stream.defaultBinder
如果配置了多個繫結器,則使用預設的binder。請參閱Classpath上的Multiple Binders。
預設值:空。
spring.cloud.stream.overrideCloudConnectors
此屬性僅適用於cloud配置檔案啟用且Spring Cloud聯結器隨應用程式一起提供。如果屬性為false(預設值),繫結器將檢測適合的繫結服務(例如,在Cloud Foundry中為RabbitMQ繫結器繫結的RabbitMQ服務),並將使用它來建立連線(通常通過Spring Cloud聯結器)。當設定為true時,此屬性指示繫結器完全忽略繫結的服務,並依賴Spring Boot屬性(例如,依賴於RabbitMQ繫結器環境中提供的spring.rabbitmq.*屬性)。當連線到多個系統時,此屬性的典型用法將巢狀在定製環境中。
預設值:false。
繫結Properties
繫結屬性使用格式spring.cloud.stream.bindings.<channelName>.<property>=<value>
為了避免重複,Spring Cloud Stream支援所有通道的設定值,格式為spring.cloud.stream.default.<property>=<value>。
在下面的內容中,我們指出我們在哪裡省略了spring.cloud.stream.bindings.<channelName>.字首,並且只關注屬性名稱,但有一個理解,字首將被包含在執行時。
Properties使用Spring Cloud Stream
以下繫結屬性可用於輸入和輸出繫結,並且必須以spring.cloud.stream.bindings.<channelName>.為字首,例如spring.cloud.stream.bindings.input.destination=ticktock。
可以使用字首spring.cloud.stream.default設定預設值,例如spring.cloud.stream.default.contentType=application/json。
目的地
繫結中介軟體上的通道的目標目標(例如,RabbitMQ交換或Kafka主題)。如果通道繫結為消費者,則可以將其繫結到多個目標,並且目標名稱可以指定為逗號分隔的字串值。如果未設定,則使用通道名稱。此屬性的預設值不能被覆蓋。
組
渠道的消費群體。僅適用於入站繫結。參見消費者群體。
預設值:null(表示匿名消費者)。
內容型別
頻道的內容型別。
預設值:null(以便不執行型別強制)。
粘合劑
這種繫結使用的粘合劑。有關詳細資訊,請參閱Classpath上的Multiple Binders。
預設值:null(預設的binder將被使用,如果存在)。
消費者物業
以下繫結屬性僅適用於輸入繫結,並且必須以spring.cloud.stream.bindings.<channelName>.consumer.為字首,例如spring.cloud.stream.bindings.input.consumer.concurrency=3。
預設值可以使用字首spring.cloud.stream.default.consumer設定,例如spring.cloud.stream.default.consumer.headerMode=raw。
併發
入站消費者的併發性。
預設值:1。
分割槽
消費者是否從分割槽生產者接收資料。
預設值:false。
headerMode
設定為raw時,禁用輸入標頭檔案解析。僅適用於不支援訊息頭的訊息中介軟體,並且需要頭部嵌入。入站資料來自外部Spring Cloud Stream應用程式時很有用。
預設值:embeddedHeaders。
maxAttempts
如果處理失敗,則嘗試處理訊息的次數(包括第一個)。設定為1以禁用重試。
預設值:3。
backOffInitialInterval
退避初始間隔重試。
預設值:1000。
backOffMaxInterval
最大回退間隔。
預設值:10000。
backOffMultiplier
退避倍數。
預設值:2.0。
instanceIndex
當設定為大於等於零的值時,允許自定義此消費者的例項索引(如果與spring.cloud.stream.instanceIndex不同)。設定為負值時,它將預設為spring.cloud.stream.instanceIndex。
預設值:-1。
instanceCount
當設定為大於等於零的值時,允許自定義此消費者的例項計數(如果與spring.cloud.stream.instanceCount不同)。當設定為負值時,它將預設為spring.cloud.stream.instanceCount。
預設值:-1。
製作人Properties
以下繫結屬性僅可用於輸出繫結,並且必須以spring.cloud.stream.bindings..producer.為字首,例如spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id。
預設值可以使用字首spring.cloud.stream.default.producer設定,例如spring.cloud.stream.default.producer.partitionKeyExpression=payload.id。
partitionKeyExpression
一個確定如何分配出站資料的SpEL表示式。如果設定,或者如果設定了partitionKeyExtractorClass,則該通道上的出站資料將被分割槽,並且partitionCount必須設定為大於1的值才能生效。這兩個選項是相互排斥的。請參閱分割槽支援。
預設值:null。
partitionKeyExtractorClass
一個PartitionKeyExtractorStrategy實現。如果設定,或者如果設定了partitionKeyExpression,則該通道上的出站資料將被分割槽,並且partitionCount必須設定為大於1的值才能生效。這兩個選項是相互排斥的。請參閱分割槽支援。
預設值:null。
partitionSelectorClass
一個PartitionSelectorStrategy實現。與partitionSelectorExpression相互排斥。如果沒有設定,則分割槽將被選為hashCode(key) % partitionCount,其中key通過partitionKeyExpression或partitionKeyExtractorClass計算。
預設值:null。
partitionSelectorExpression
用於自定義分割槽選擇的SpEL表示式。與partitionSelectorClass相互排斥。如果沒有設定,則分割槽將被選為hashCode(key) % partitionCount,其中key通過partitionKeyExpression或partitionKeyExtractorClass計算。
預設值:null。
partitionCount
如果啟用分割槽,則資料的目標分割槽數。如果生產者被分割槽,則必須設定為大於1的值。在Kafka,解釋為提示; 而是使用更大的和目標主題的分割槽計數。
預設值:1。
requiredGroups
生成者必須確保訊息傳遞的組合的逗號分隔列表,即使它們在建立之後啟動(例如,通過在RabbitMQ中預先建立持久佇列)。
headerMode
設定為raw時,禁用輸出上的標題嵌入。僅適用於不支援訊息頭的訊息中介軟體,並且需要頭部嵌入。生成非Spring Cloud Stream應用程式的資料時很有用。
預設值:embeddedHeaders。
useNativeEncoding
當設定為true時,出站訊息由客戶端庫直接序列化,必須相應配置(例如設定適當的Kafka生產者值序列化程式)。當使用此配置時,出站訊息編組不是基於繫結的contentType。當使用本地編碼時,消費者有責任使用適當的解碼器(例如:Kafka消費者價值解串器)來對入站訊息進行反序列化。此外,當使用本機編碼/解碼時,headerMode屬性將被忽略,標題不會嵌入到訊息中。
預設值:false。
使用動態繫結目的地
除了通過@EnableBinding定義的通道之外,Spring Cloud Stream允許應用程式將訊息傳送到動態繫結的目的地。這是有用的,例如,當目標目標需要在執行時確定。應用程式可以使用@EnableBinding註冊自動註冊的BinderAwareChannelResolver bean。
屬性“spring.cloud.stream.dynamicDestinations”可用於將動態目標名稱限制為預先已知的集合(白名單)。如果屬性未設定,任何目的地都可以動態繫結。
可以直接使用BinderAwareChannelResolver,如以下示例所示,其中REST控制器使用路徑變數來確定目標通道。
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @PathVariable("target") target,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, target, contentType);
}
private void sendMessage(String body, String target, Object contentType) {
resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
}
在預設埠8080上啟動應用程式後,傳送以下資料時:
curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers
curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders
目的地的客戶和“訂單”是在經紀人中建立的(例如:在Rabbit的情況下進行交換,或者在Kafka的情況下為主題),其名稱為“客戶”和“訂單”,資料被髮布到適當的目的地。
BinderAwareChannelResolver是通用的Spring Integration DestinationResolver,可以注入其他元件。例如,在使用基於傳入JSON訊息的target欄位的SpEL表示式的路由器中。
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/", method = POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, contentType);
}
private void sendMessage(Object body, Object contentType) {
routerChannel().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "routerChannel")
public MessageChannel routerChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "routerChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router =
new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
}
歡迎關注作者的公眾號《Java程式設計生活》,每日記載Java程式猿工作中遇到的問題