1. 程式人生 > >Spring Cloud Stream 配置選項

Spring Cloud Stream 配置選項

配置選項

Spring Cloud Stream支援常規配置選項以及繫結和繫結器的配置。一些繫結器允許額外的繫結屬性來支援中介軟體特定的功能。

可以通過Spring Boot支援的任何機制將配置選項提供給Spring Cloud Stream應用程式。這包括應用程式引數,環境變數和YAML或.properties檔案。

Spring Cloud Stream Properties

spring.cloud.stream.instanceCount

應用程式部署例項的數量。必須設定分割槽,如果使用Kafka。

預設值:1

spring.cloud.stream.instanceIndex

應用程式的例項索引:從0

instanceCount - 1的數字。用於分割槽和使用Kafka。在Cloud Foundry中自動設定以匹配應用程式的例項索引。

spring.cloud.stream.dynamicDestinations

可以動態繫結的目標列表(例如,在動態路由方案中)。如果設定,只能列出目的地。

預設值:空(允許任何目的地繫結)。

spring.cloud.stream.defaultBinder

如果配置了多個繫結器,則使用預設的binder。請參閱Classpath上的Multiple Binders。

預設值:空。

spring.cloud.stream.overrideCloudConnectors

此屬性僅適用於cloud配置檔案啟用且Spring Cloud聯結器隨應用程式一起提供。如果屬性為false(預設值),繫結器將檢測適合的繫結服務(例如,在Cloud Foundry中為RabbitMQ繫結器繫結的RabbitMQ服務),並將使用它來建立連線(通常通過Spring Cloud聯結器)。當設定為true時,此屬性指示繫結器完全忽略繫結的服務,並依賴Spring Boot屬性(例如,依賴於RabbitMQ繫結器環境中提供的spring.rabbitmq.*屬性)。當連線到多個系統時,此屬性的典型用法將巢狀在定製環境中。

預設值:false。

繫結Properties

繫結屬性使用格式spring.cloud.stream.bindings.<channelName>.<property>=<value>

提供。<channelName>表示正在配置的通道的名稱(例如Sourceoutput)。

為了避免重複,Spring Cloud Stream支援所有通道的設定值,格式為spring.cloud.stream.default.<property>=<value>

在下面的內容中,我們指出我們在哪裡省略了spring.cloud.stream.bindings.<channelName>.字首,並且只關注屬性名稱,但有一個理解,字首將被包含在執行時。

Properties使用Spring Cloud Stream

以下繫結屬性可用於輸入和輸出繫結,並且必須以spring.cloud.stream.bindings.<channelName>.為字首,例如spring.cloud.stream.bindings.input.destination=ticktock

可以使用字首spring.cloud.stream.default設定預設值,例如spring.cloud.stream.default.contentType=application/json

目的地

繫結中介軟體上的通道的目標目標(例如,RabbitMQ交換或Kafka主題)。如果通道繫結為消費者,則可以將其繫結到多個目標,並且目標名稱可以指定為逗號分隔的字串值。如果未設定,則使用通道名稱。此屬性的預設值不能被覆蓋。

渠道的消費群體。僅適用於入站繫結。參見消費者群體。

預設值:null(表示匿名消費者)。

內容型別

頻道的內容型別。

預設值:null(以便不執行型別強制)。

粘合劑

這種繫結使用的粘合劑。有關詳細資訊,請參閱Classpath上的Multiple Binders。

預設值:null(預設的binder將被使用,如果存在)。

消費者物業

以下繫結屬性僅適用於輸入繫結,並且必須以spring.cloud.stream.bindings.<channelName>.consumer.為字首,例如spring.cloud.stream.bindings.input.consumer.concurrency=3

預設值可以使用字首spring.cloud.stream.default.consumer設定,例如spring.cloud.stream.default.consumer.headerMode=raw

併發

入站消費者的併發性。

預設值:1

分割槽

消費者是否從分割槽生產者接收資料。

預設值:false

headerMode

設定為raw時,禁用輸入標頭檔案解析。僅適用於不支援訊息頭的訊息中介軟體,並且需要頭部嵌入。入站資料來自外部Spring Cloud Stream應用程式時很有用。

預設值:embeddedHeaders

maxAttempts

如果處理失敗,則嘗試處理訊息的次數(包括第一個)。設定為1以禁用重試。

預設值:3

backOffInitialInterval

退避初始間隔重試。

預設值:1000

backOffMaxInterval

最大回退間隔。

預設值:10000

backOffMultiplier

退避倍數。

預設值:2.0

instanceIndex

當設定為大於等於零的值時,允許自定義此消費者的例項索引(如果與spring.cloud.stream.instanceIndex不同)。設定為負值時,它將預設為spring.cloud.stream.instanceIndex

預設值:-1

instanceCount

當設定為大於等於零的值時,允許自定義此消費者的例項計數(如果與spring.cloud.stream.instanceCount不同)。當設定為負值時,它將預設為spring.cloud.stream.instanceCount

預設值:-1

製作人Properties

以下繫結屬性僅可用於輸出繫結,並且必須以spring.cloud.stream.bindings..producer.為字首,例如spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id

預設值可以使用字首spring.cloud.stream.default.producer設定,例如spring.cloud.stream.default.producer.partitionKeyExpression=payload.id

partitionKeyExpression

一個確定如何分配出站資料的SpEL表示式。如果設定,或者如果設定了partitionKeyExtractorClass,則該通道上的出站資料將被分割槽,並且partitionCount必須設定為大於1的值才能生效。這兩個選項是相互排斥的。請參閱分割槽支援。

預設值:null。

partitionKeyExtractorClass

一個PartitionKeyExtractorStrategy實現。如果設定,或者如果設定了partitionKeyExpression,則該通道上的出站資料將被分割槽,並且partitionCount必須設定為大於1的值才能生效。這兩個選項是相互排斥的。請參閱分割槽支援。

預設值:null。

partitionSelectorClass

一個PartitionSelectorStrategy實現。與partitionSelectorExpression相互排斥。如果沒有設定,則分割槽將被選為hashCode(key) % partitionCount,其中key通過partitionKeyExpressionpartitionKeyExtractorClass計算。

預設值:null。

partitionSelectorExpression

用於自定義分割槽選擇的SpEL表示式。與partitionSelectorClass相互排斥。如果沒有設定,則分割槽將被選為hashCode(key) % partitionCount,其中key通過partitionKeyExpressionpartitionKeyExtractorClass計算。

預設值:null。

partitionCount

如果啟用分割槽,則資料的目標分割槽數。如果生產者被分割槽,則必須設定為大於1的值。在Kafka,解釋為提示; 而是使用更大的和目標主題的分割槽計數。

預設值:1

requiredGroups

生成者必須確保訊息傳遞的組合的逗號分隔列表,即使它們在建立之後啟動(例如,通過在RabbitMQ中預先建立持久佇列)。

headerMode

設定為raw時,禁用輸出上的標題嵌入。僅適用於不支援訊息頭的訊息中介軟體,並且需要頭部嵌入。生成非Spring Cloud Stream應用程式的資料時很有用。

預設值:embeddedHeaders

useNativeEncoding

當設定為true時,出站訊息由客戶端庫直接序列化,必須相應配置(例如設定適當的Kafka生產者值序列化程式)。當使用此配置時,出站訊息編組不是基於繫結的contentType。當使用本地編碼時,消費者有責任使用適當的解碼器(例如:Kafka消費者價值解串器)來對入站訊息進行反序列化。此外,當使用本機編碼/解碼時,headerMode屬性將被忽略,標題不會嵌入到訊息中。

預設值:false

使用動態繫結目的地

除了通過@EnableBinding定義的通道之外,Spring Cloud Stream允許應用程式將訊息傳送到動態繫結的目的地。這是有用的,例如,當目標目標需要在執行時確定。應用程式可以使用@EnableBinding註冊自動註冊的BinderAwareChannelResolver bean。

屬性“spring.cloud.stream.dynamicDestinations”可用於將動態目標名稱限制為預先已知的集合(白名單)。如果屬性未設定,任何目的地都可以動態繫結。

可以直接使用BinderAwareChannelResolver,如以下示例所示,其中REST控制器使用路徑變數來確定目標通道。

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

	@Autowired
	private BinderAwareChannelResolver resolver;

	@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @PathVariable("target") target,
	       @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, target, contentType);
	}

	private void sendMessage(String body, String target, Object contentType) {
		resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}
}

在預設埠8080上啟動應用程式後,傳送以下資料時:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

目的地的客戶和“訂單”是在經紀人中建立的(例如:在Rabbit的情況下進行交換,或者在Kafka的情況下為主題),其名稱為“客戶”和“訂單”,資料被髮布到適當的目的地。

BinderAwareChannelResolver是通用的Spring Integration DestinationResolver,可以注入其他元件。例如,在使用基於傳入JSON訊息的target欄位的SpEL表示式的路由器中。

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

	@Autowired
	private BinderAwareChannelResolver resolver;


	@RequestMapping(path = "/", method = POST, consumes = "application/json")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, contentType);
	}

	private void sendMessage(Object body, Object contentType) {
		routerChannel().send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}

	@Bean(name = "routerChannel")
	public MessageChannel routerChannel() {
		return new DirectChannel();
	}

	@Bean
	@ServiceActivator(inputChannel = "routerChannel")
	public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
		router.setDefaultOutputChannelName("default-output");
		router.setChannelResolver(resolver);
		return router;
	}
}

歡迎關注作者的公眾號《Java程式設計生活》,每日記載Java程式猿工作中遇到的問題 在這裡插入圖片描述