1. 程式人生 > >SpringBoot(17)

SpringBoot(17)

Apache Kafka的支援

1. 簡介

spring-kafka專案的自動配置來支援Apache Kafka。

Kafka可以通過spring.kafka.*屬性配置。例如,可以在application.properties宣告以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

檢視org.springframework.boot.autoconfigure.kafka.KafkaProperties獲取更多屬性配置。

2. 傳送訊息

Spring自動配置了KafkaTemplate,可以通過@Autowired注入。

@Component
public class MyBean {
    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    // ...
}

3. 接收訊息

當存在Apache Kafka基礎結構時,任何通過@KafkaListener註解的bean可以建立監聽器端點。假如沒有定義KafkaListenerContainerFactory,將通過spring.kafka.listener.*屬性自動配置一個預設值。

在someTopic的Topic建立一個監聽器端點:

@Component
public class MyBean {
    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }
}

4. 額外的Kafka屬性

這些屬性中的前幾個適用於生產者和消費者,但如果希望為每個屬性使用不同的值,則可以在生產者或消費者級別指定。 Apache Kafka指定具有重要性的屬性:HIGH,MEDIUM和LOW。 Spring Boot自動配置支援所有HIGH重要性屬性,一些選擇MEDIUM和LOW,以及任何沒有預設值的屬性。

只有Kafka支援的屬性的子集可通過KafkaProperties類獲得。 如果希望使用不直接支援的其他屬性配置生產者或使用者,使用以下命令:

spring.kafka.properties.foo.bar=baz

這將Kafka常見的foo.bar屬性設定為baz。

這些屬性將由使用者和生產者工廠bean共享。 如果希望使用不同的屬性自定義這些元件,例如可以為每個元件使用不同的度量標準閱讀器去覆蓋預設bean的定義,如下所示:

@Configuration
public static class CustomKafkaBeans {
    /**
     * 自定義ProducerFactory bean.
     * @param properties kafka屬性
     * @return the bean.
     */
    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
        Map<String, Object> producerProperties = properties.buildProducerProperties();
        producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MyProducerMetricsReporter.class);
        return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
    }

    /**
     * 自定義ConsumerFactory bean.
     * @param properties kafka屬性
     * @return the bean.
     */
    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
        Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        consumerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MyConsumerMetricsReporter.class);
        return new DefaultKafkaConsumerFactory<Object, Object>(consumerProperties);
    }
}