1. 程式人生 > >Spring boot 下使用Kafka例項

Spring boot 下使用Kafka例項

一個簡單的例項
步驟:

 1. 新增pom
 2. 編寫配置檔案(生產者,消費者)
 3. 測試

1. 新增pom###

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2. 編寫配置檔案###

生產者配置

import java.util.HashMap;
import
java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import
org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration @EnableKafka public class KafkaProducerConfig { public Map<String, Object> producerConfigs() { Map<String, Object> props = new
HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.20.25.171:9092"); // 如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重複訊息的可能性 props.put(ProducerConfig.RETRIES_CONFIG, 0); /** * Server完成 producer request 前需要確認的數量。 acks=0時,producer不會等待確認,直接新增到socket等待發送; * acks=1時,等待leader寫到local log就行; acks=all或acks=-1時,等待isr中所有副本確認 (注意:確認都是 broker * 接收到訊息放入記憶體就直接返回確認,不是需要等待資料寫入磁碟後才返回確認,這也是kafka快的原因) */ // props.put("acks", "all"); /** * Producer可以將發往同一個Partition的資料做成一個Produce * Request傳送請求,即Batch批處理,以減少請求次數,該值即為每次批處理的大小。 * 另外每個Request請求包含多個Batch,每個Batch對應一個Partition,且一個Request傳送的目的Broker均為這些partition的leader副本。 * 若將該值設為0,則不會進行批處理 */ props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);// /** * 預設緩衝可立即傳送,即遍緩衝空間還沒有滿,但是,如果你想減少請求的數量,可以設定linger.ms大於0。 * 這將指示生產者傳送請求之前等待一段時間,希望更多的訊息填補到未滿的批中。這類似於TCP的演算法,例如上面的程式碼段, * 可能100條訊息在一個請求傳送,因為我們設定了linger(逗留)時間為1毫秒,然後,如果我們沒有填滿緩衝區, * 這個設定將增加1毫秒的延遲請求以等待更多的訊息。 需要注意的是,在高負載下,相近的時間一般也會組成批,即使是 * linger.ms=0。在不處於高負載的情況下,如果設定比0大,以少量的延遲代價換取更少的,更有效的請求。 */ props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /** * 控制生產者可用的快取總量,如果訊息傳送速度比其傳輸到伺服器的快,將會耗盡這個快取空間。 * 當快取空間耗盡,其他傳送呼叫將被阻塞,阻塞時間的閾值通過max.block.ms設定, 之後它將丟擲一個TimeoutException。 */ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }

消費者配置

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import com.example.demo.mq.Consumer;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(5000);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.20.25.171:9092");
        /**
         * 如果設定成true,偏移量由auto.commit.interval.ms控制自動提交的頻率。
         * 
         * 如果設定成false,不需要定時的提交offset,可以自己控制offset,當訊息認為已消費過了,這個時候再去提交它們的偏移量。
         * 這個很有用的,當消費的訊息結合了一些處理邏輯,這個訊息就不應該認為是已經消費的,直到它完成了整個處理。
         */
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //提交延遲毫秒數
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        //執行超時時間
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //組ID
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        /**
         * 在consumter端配置檔案中(或者是ConsumerConfig類引數)有個"autooffset.reset"(在kafka 0.8版本中為auto.offset.reset),
         * 有2個合法的值"largest"/"smallest",預設為"largest",此配置引數表示當此groupId下的消費者,在ZK中沒有offset值時(比如新的groupId,或者是zk資料被清空),
         * consumer應該從哪個offset開始消費.largest表示接受接收最大的offset(即最新訊息),smallest表示最小offset,即從topic的開始位置消費所有訊息.
         */
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return propsMap;
    }

    @Bean
    public Consumer listener() {
        return new Consumer();
    }
}

註解寫的已經很清楚了

3. 測試###

生產者


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@EnableScheduling
public class Producer {
    @Autowired
    private KafkaTemplate<?, String> kafkaTemplate;

    @Scheduled(fixedDelay = 5000)
    public void send() {
        log.info("生產者 :{}",
                "gaha_hero" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
        kafkaTemplate.send("test-topic",
                "gaha_hero" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
        //send引數 1:topic,2:key,3:引數
        // 預設情況下,Kafka根據傳遞訊息的key來進行分割槽的分配,即hash(key) % numPartitions,沒有指定key就隨機分配一個分割槽
    }

}

消費者

import org.springframework.kafka.annotation.KafkaListener;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Consumer {

     @KafkaListener(topics = {"test-topic"})
        public void consumer(String message){
            log.info("消費者: {}", message);
        }
}