SpringBoot(17)
阿新 • • 發佈:2018-12-19
Apache Kafka的支援
1. 簡介
spring-kafka專案的自動配置來支援Apache Kafka。
Kafka可以通過spring.kafka.*屬性配置。例如,可以在application.properties宣告以下配置:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
檢視org.springframework.boot.autoconfigure.kafka.KafkaProperties獲取更多屬性配置。
2. 傳送訊息
Spring自動配置了KafkaTemplate,可以通過@Autowired注入。
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
3. 接收訊息
當存在Apache Kafka基礎結構時,任何通過@KafkaListener註解的bean可以建立監聽器端點。假如沒有定義KafkaListenerContainerFactory,將通過spring.kafka.listener.*屬性自動配置一個預設值。
在someTopic的Topic建立一個監聽器端點:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
4. 額外的Kafka屬性
這些屬性中的前幾個適用於生產者和消費者,但如果希望為每個屬性使用不同的值,則可以在生產者或消費者級別指定。 Apache Kafka指定具有重要性的屬性:HIGH,MEDIUM和LOW。 Spring Boot自動配置支援所有HIGH重要性屬性,一些選擇MEDIUM和LOW,以及任何沒有預設值的屬性。
只有Kafka支援的屬性的子集可通過KafkaProperties類獲得。 如果希望使用不直接支援的其他屬性配置生產者或使用者,使用以下命令:
spring.kafka.properties.foo.bar=baz
這將Kafka常見的foo.bar屬性設定為baz。
這些屬性將由使用者和生產者工廠bean共享。 如果希望使用不同的屬性自定義這些元件,例如可以為每個元件使用不同的度量標準閱讀器去覆蓋預設bean的定義,如下所示:
@Configuration
public static class CustomKafkaBeans {
/**
* 自定義ProducerFactory bean.
* @param properties kafka屬性
* @return the bean.
*/
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MyProducerMetricsReporter.class);
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
}
/**
* 自定義ConsumerFactory bean.
* @param properties kafka屬性
* @return the bean.
*/
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
consumerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MyConsumerMetricsReporter.class);
return new DefaultKafkaConsumerFactory<Object, Object>(consumerProperties);
}
}