Spring Cloud Stream Kafka 特定分割槽的訊息始終由特定的消費者例項消費
阿新 • • 發佈:2019-01-02
實驗目的:Kafka特定分割槽的訊息始終由消費者應用的特定例項消費,例如,分割槽1由例項索引0的例項消費,分割槽2由例項索引1的例項消費,分割槽3由例項索引2的例項消費。
專案介紹:專案分為1個生產者例項,3個消費者例項,生產者應用和消費者應用均為Spring Cloud Eureka客戶端專案。生產者例項將訊息傳送到Kafka Topic的3個分割槽中,消費者的3個例項分別按例項索引消費Kafka Topic的3個分割槽資料。即,例項索引0的例項消費分割槽0,例項索引1的例項消費分割槽1,例項索引2的例項消費分割槽2。
生產者專案結構:
引入Spring Cloud Stream依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
應用類:
SpringCloudStreamKafkaProducerApplication.java import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; @EnableBinding(Source.class) @EnableScheduling @SpringBootApplication public class SpringCloudStreamKafkaProducerApplication { @Autowired private Source source; public static void main(String[] args) { SpringApplication.run(SpringCloudStreamKafkaProducerApplication.class, args); } @Scheduled(fixedRate = 5000) public void handle1() { Person person = new Person(); Long currentTimeMillis = System.currentTimeMillis(); person.setId(Long.parseLong(currentTimeMillis.toString().substring(currentTimeMillis.toString().length() - 1))); person.setName("rock "); System.out.println("send a person..." + person); source.output().send(MessageBuilder.withPayload(person).build()); } public static class Person { private Long id; private String name; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Person{" + "id=" + id + ", name='" + name + '\'' + '}'; } } }
通道類:
CustomSource.java
public interface CustomSource {
String OUTPUT1 = "output1";
@Output(CustomSource.OUTPUT1)
MessageChannel output1();
String OUTPUT2 = "output2";
@Output(CustomSource.OUTPUT2)
MessageChannel output2();
}
配置類:
KafkaBindingConfig.java
@Configuration
public class KafkaBindingConfig {
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
return new CustomPartitionKeyExtractorClass();
}
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
return new CustomPartitionSelectorClass();
}
}
CustomPartitionKeyExtractorClass.java
/**
* <p>Description: 從Message中提取partition key的策略</p>
*/
public class CustomPartitionKeyExtractorClass implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<?> message) {
Object obj = message.getPayload();
System.out.println("訊息載荷:" + obj);
if (obj instanceof SpringCloudStreamKafkaProducerApplication.Person) {
SpringCloudStreamKafkaProducerApplication.Person person = (SpringCloudStreamKafkaProducerApplication.Person) obj;
return person.getId();
}
return null;
}
}
CustomPartitionSelectorClass.java
/**
* <p>Description: 決定message傳送到哪個partition的策略</p>
*/
public class CustomPartitionSelectorClass implements PartitionSelectorStrategy {
@Override
public int selectPartition(Object key, int partitionCount) {
System.out.println("訊息載荷的key:" + key + " partitionCount:" + partitionCount);
if (!ObjectUtils.isEmpty(key)) {
Long id = (Long) key;
return id.intValue() % partitionCount;
}
return 0;
}
}
配置檔案:
application.properties
server.port=8881
spring.application.name=spring-cloud-stream-kafka-producer
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
# Kafka Binder Properties
# A list of brokers to which the Kafka binder connects.
# Default: localhost.
spring.cloud.stream.kafka.binder.brokers=localhost:9092
# If set to true, the binder creates new topics automatically.
# If set to false, the binder relies on the topics being already configured.
# In the latter case, if the topics do not exist, the binder fails to start.
# This setting is independent of the auto.topic.create.enable setting of the broker and does not influence it.
# If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
# Default: true.
spring.cloud.stream.kafka.binder.autoCreateTopics=true
# If set to true, the binder creates new partitions if required.
# If set to false, the binder relies on the partition size of the topic being already configured.
# If the partition count of the target topic is smaller than the expected value, the binder fails to start.
# Default: false.
spring.cloud.stream.kafka.binder.autoAddPartitions=true
management.endpoints.web.exposure.include=bindings
# 通過兩個channel向同一個topic傳送訊息
spring.cloud.stream.bindings.output.destination=topic2
spring.cloud.stream.bindings.output.content-type=application/json
# 配置分割槽的輸出繫結
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id
# 此屬性開始若報無訂閱者錯誤,需開啟autoAddPartitions=true
# 輸出訊息分佈到3個分割槽
spring.cloud.stream.bindings.output.producer.partitionCount=3
# partition Key 提取器名稱,負責從訊息中提取分割槽key
spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor
# 自定義partition選擇器,負責根據分割槽key和partitionCount計算出將訊息釋出到哪個分割槽
spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector
# LOGGING
#logging.level.root=WARN
#logging.level.org.springframework.web=DEBUG
#logging.level.org.springframework=DEBUG
#logging.level.com.spring.cloud.stream.kafka.consumer.producer=DEBUG
logging.pattern.console=${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %4line %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}
消費者專案結構:
引入Spring Cloud Stream依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
應用類:
KafkaConsumer1Application.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
@SpringBootApplication
public class KafkaConsumer1Application {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumer1Application.class, args);
}
@StreamListener(Sink.INPUT)
public void handle(Person person) {
System.out.println("handle Received: " + person);
}
public static class Person {
private Long id;
private String name;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
}
配置檔案:
application-c1.properties
server.port=8871
spring.application.name=spring-cloud-stream-kafka-consumer
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
# input通道對應的設定
spring.cloud.stream.bindings.input.destination=topic2
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=spring-cloud-stream-kafka-consumer
# 同一個應用的例項數量和例項索引
spring.cloud.stream.instanceCount=3
spring.cloud.stream.instanceIndex=0
# 配置分割槽的輸入繫結
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.bindings.input.consumer.concurrency=1
# autoRebalanceEnabled為true(default)時,Kafka負責在例項之間分佈partitions,不需要這些屬性:instanceCount,instanceIndex,partitioned
# autoRebalanceEnabled為false時,binder使用instanceCount and instanceIndex決定例項訂閱哪個partition
# partition數量至少要與例項數量一致
# binder代替Kafka計算partitions
# 這可以讓特定分割槽的訊息始終進入同一個例項
spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=false
application-c2.properties
server.port=8872
spring.application.name=spring-cloud-stream-kafka-consumer
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
# input通道對應的設定
spring.cloud.stream.bindings.input.destination=topic2
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=spring-cloud-stream-kafka-consumer
# 同一個應用的例項數量和例項索引
spring.cloud.stream.instanceCount=3
spring.cloud.stream.instanceIndex=1
# 配置分割槽的輸入繫結
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.bindings.input.consumer.concurrency=1
# autoRebalanceEnabled為true(default)時,Kafka負責在例項之間分佈partitions,不需要這些屬性:instanceCount,instanceIndex,partitioned
# autoRebalanceEnabled為false時,binder使用instanceCount and instanceIndex決定例項訂閱哪個partition
# partition數量至少要與例項數量一致
# binder代替Kafka計算partitions
# 這可以讓特定分割槽的訊息始終進入同一個例項
spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=false
application-c3.properties
server.port=8873
spring.application.name=spring-cloud-stream-kafka-consumer
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
# input通道對應的設定
spring.cloud.stream.bindings.input.destination=topic2
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=spring-cloud-stream-kafka-consumer
# 同一個應用的例項數量和例項索引
spring.cloud.stream.instanceCount=3
spring.cloud.stream.instanceIndex=2
# 配置分割槽的輸入繫結
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.bindings.input.consumer.concurrency=1
# autoRebalanceEnabled為true(default)時,Kafka負責在例項之間分佈partitions,不需要這些屬性:instanceCount,instanceIndex,partitioned
# autoRebalanceEnabled為false時,binder使用instanceCount and instanceIndex決定例項訂閱哪個partition
# partition數量至少要與例項數量一致
# binder代替Kafka計算partitions
# 這可以讓特定分割槽的訊息始終進入同一個例項
spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=false
結果:分別啟動生產者,3個消費者例項,可以在輸出中看到,例項索引0的消費者消費的分割槽為0,例項索引1的消費者消費的分割槽為1,例項索引2的消費者消費的分割槽為2。