springboot kafka group.id多消費組配置
阿新 • • 發佈:2019-01-01
很早之前就使用了springboot + kafka組合配置,但是之前使用的spring-kafka(1.1.7)版本較低,所以只能通過 spring.kafka.consumer.group-id=default_consumer_group 或者 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "default_consumer_group");的形式配置一個預設消組,當然理論上這也是沒有問題的,但是如果你定義的topic數量過多且併發消費比較大,只有一個消費組的配置方式就會暴露出很多問題,其中主要的一個問題便是每個topic分割槽的offset偏移量問題(在大併發下會出現offset異常問題),因為他們都儲存在同一個消費組中。
直到後來釋出了spring-kafka 1.3.x的版本後,增加了groupId的屬性,非常方便的幫助我們解決了實現每個topic自定義一個消費組的問題,我們再也不用共用一個消費組了。
接下來通過程式碼演示看是否如我們的期望一樣:
pom依賴
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--引入elasticsearch--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
application.properties
server.port=10087 spring.application.name=example #topic spring.kafka.bootstrap-servers=10.0.2.22:9092 kafka.test.topic=TEST_TOPIC #es spring.data.elasticsearch.cluster-name=elasticsearch spring.data.elasticsearch.cluster-nodes=10.0.2.23:9300 #spring.data.elasticsearch.cluster-nodes=10.0.2.22:9300
生產者:
/**
* @author xiaofeng
* @version V1.0
* @title: TestKafkaSender.java
* @package: com.example.demo.kafka.sender
* @description: kafka生產者
* @date 2018/4/2 0002 下午 3:31
*/
@Component
public class TestKafkaSender {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${kafka.test.topic}")
String testTopic;
public void sendTest(String msg){
kafkaTemplate.send(testTopic, msg);
}
}
消費者1:
/**
* @author xiaofeng
* @version V1.0
* @title: TestKafkaConsumer2.java
* @package: com.example.demo.kafka.consumer
* @description: kafka消費者
* @date 2018/4/2 0002 下午 3:31
*/
@Component
public class TestKafkaConsumer {
Logger logger = LoggerFactory.getLogger(getClass());
/**
* topics: 配置消費topic,以陣列的形式可以配置多個
* groupId: 配置消費組為”xiaofeng1“
*
* @param message
*/
@KafkaListener(topics = {"${kafka.test.topic}"},groupId = "xiaofeng1")
public void consumer(String message) {
logger.info("groupId = xiaofeng1, message = " + message);
}
}
消費者2:
/**
* @author xiaofeng
* @version V1.0
* @title: TestKafkaConsumer2.java
* @package: com.example.demo.kafka.consumer
* @description: kafka消費者
* @date 2018/4/2 0002 下午 3:31
*/
@Component
public class TestKafkaConsumer2 {
Logger logger = LoggerFactory.getLogger(getClass());
/**
* topics: 配置消費topic,以陣列的形式可以配置多個
* groupId: 配置消費組為”xiaofeng2“
*
* @param message
*/
@KafkaListener(topics = {"${kafka.test.topic}"}, groupId = "xiaofeng2")
public void consumer(String message) {
logger.info("groupId = xiaofeng2, message = " + message);
}
}
測試類:
@Autowired
TestKafkaSender sender;
@Test
public void send() {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
logger.info("send message = " + i);
sender.sendTest(i + "");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
執行效果: