1. 程式人生 > >spirngboot 配置kafka實現group訂閱訊息

spirngboot 配置kafka實現group訂閱訊息

本人所使用的kafka版本是kafka_2.11-2.0.1版本,jdk1.8、zookeeper-3.3.6,kafka運行於JVM環境,依賴zookeeper註冊中心,所以需要準備這三個軟體,本文測試使用的是windows版本的kafka和zookeeper便於測試

環境搭建 步驟

1、搭建jdk環境,配置環境變數(省略,請自行百度搜索)

2、zookeeper環境配置,修改zookeeper配置檔案

將zoo_sample.cfg 修改為zoo.cfg檔案,修改zookeeper的訪問埠

啟動zookeeper,在bin目錄下zkServer.cmd雙擊即可

3.1 下載安裝檔案: http://kafka.apache.org/downloads.html
3.2 解壓檔案(本文解壓到 E:\worksoft\kafka_2.11-2.0.1)
3.3 開啟E:\worksoft\kafka_2.11-2.0.1\config
3.4 從文字編輯器裡開啟 server.properties
3.5 把 log.dirs的值改成 “E:\worksoft\kafka_2.11-2.0.1\kafka-logs”
3.6 開啟cmd
3.7 進入kafka檔案目錄: cd /d E:\worksoft\kafka_2.11-2.0.1\
3.8 輸入並執行以開啟kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
 
4. 建立topics
4.1 開啟cmd 並進入E:\worksoft\kafka_2.11-2.0.1\bin\windows
4.2 建立一個topic:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
 

5. 開啟一個Producer:
cd /d E:\worksoft\kafka_2.11-2.0.1\bin\windows
kafka-console-producer.bat --broker-list localhost:9092 --topic test
 

6. 開啟一個Consumer:
cd /d E:\worksoft\kafka_2.11-2.0.1\bin\windows
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

檢視kafka中的所有組

./kafka-consumer-groups.bat  --bootstrap-server 127.0.0.1:9092 --list

檢視某個consumer組的消費堆積量

kafka-consumer-groups.bat  --bootstrap-server 127.0.0.1:9092 --group lt_datacenter_processor_kafka_to_len_consumer --describe

檢視kafka某個topic下partition資訊

kafka-topics.bat --zookeeper 127.0.0.1:2181  --topic lenovo --describe 

springboot整合kafka。通過在啟動類上開啟註解方式啟動

pom檔案裡面新增kafka依賴jar包

<!-- 提供kafka核心工具包 -->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.3.5.RELEASE</version>
</dependency>

<!-- 提供AdminClient管理 AdminClientConfig-->
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.0.1</version>
</dependency>
/**
 * 自動啟用 Kafka
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(KafkaConsumerConfiguration.class)
@Documented
public @interface EnableKafkaConsumer {
}
@EnableKafka
public class KafkaConsumerConfiguration {
    public static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfiguration.class);

    /**
     * 引用kafkaProperties屬性類
     */
    @Resource
    private KafkaProperties kafkaProperties;


    /**
     * 消費者執行緒使用自定義執行緒池
     *
     * @return
     */
    @Bean(name = "consumerTaskExecutor")
    public ThreadPoolTaskExecutor consumerTaskExecutor() {


        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //執行緒池活躍的執行緒數
        executor.setCorePoolSize(20);
        //executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        //執行緒池最大活躍的執行緒數
        executor.setMaxPoolSize(200);
        //執行緒名稱字首
        executor.setThreadNamePrefix("kafkaThread-C-");
        //執行緒池維護執行緒所允許的空閒時間
        executor.setKeepAliveSeconds(30 * 60);
        //核心執行緒池也會請0
        executor.setAllowCoreThreadTimeOut(true);
        //執行緒池拒絕機制,拋棄最久的執行緒
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //executor.setWaitForTasksToCompleteOnShutdown(true);
        //執行緒池佇列
        executor.setQueueCapacity(10000);
        executor.initialize();

        return executor;
    }


    /**
     * 重寫設定自定義執行緒池
     *
     * @param <K>
     * @param <V>
     */
    public class CustomConcurrentKafkaListenerContainerFactory<K, V> extends ConcurrentKafkaListenerContainerFactory<K, V> {

        /**
         * The executor for threads that poll the consumer.
         */
        private AsyncListenableTaskExecutor consumerTaskExecutor;


        public CustomConcurrentKafkaListenerContainerFactory(AsyncListenableTaskExecutor consumerTaskExecutor) {
            this.consumerTaskExecutor = consumerTaskExecutor;
        }

        @Override
        protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) {
            super.initializeContainer(instance);
            instance.getContainerProperties().setConsumerTaskExecutor(consumerTaskExecutor);
        }
    }

    /**
     * 注入消費者監聽器
     *
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new CustomConcurrentKafkaListenerContainerFactory<Object, Object>(consumerTaskExecutor());

        if (null != kafkaProperties.getListener().getConcurrency()) {
            factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
        }
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
 
    /**
     * 注入消費者工廠
     *
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    /**
     * 注入消費者
     *
     * @return
     */
    @Bean
    public KafkaConsumer<Object, Object> consumer() {

        log.info("<<<<<<<<<<<<<<< 載入 KafkaConsumer 服務 >>>>>>>>>>>>>>>>>>");

        return new KafkaConsumer<>(kafkaProperties.buildConsumerProperties());
    }


    /**
     * Admin管理配置
     *
     * @return
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {

        log.info("<<<<<<<<<<<<<<< 載入 KafkaAdmin 服務 >>>>>>>>>>>>>>>>>>");

        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        // 使用內建的Kafka
        //configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
        return new KafkaAdmin(configs);
    }

}
@Component
public class KafkaConsumerUtils {

    private static KafkaConsumer consumer;

    public KafkaConsumerUtils() {
    }

    @Autowired
    public KafkaConsumerUtils(KafkaConsumer kafkaConsumer) {
        KafkaConsumerUtils.consumer = kafkaConsumer;
        System.out.println("kafka消費者初始化完成...");
    }

    private static volatile KafkaConsumerUtils instance;

    /**
     * 單例模式,執行緒安全
     *
     * @return
     * @throws Exception
     */
    public static KafkaConsumerUtils getIstance() {
        // 物件例項化時與否判斷(不使用同步程式碼塊,instance不等於null時,直接返回物件,提高執行效率)
        if (instance == null) {
            //同步程式碼塊(物件未初始化時,使用同步程式碼塊,保證多執行緒訪問時物件在第一次建立後,不再重複被建立)
            synchronized (KafkaConsumerUtils.class) {
                //未初始化,則初始instance變數
                if (instance == null) {
                    instance = new KafkaConsumerUtils();
                    //instance.afterPropertySet();//初始化配置和Pool池
                }
            }
        }
        return instance;
    }

}

 

spring:
  kafka:
      bootstrap-servers: 127.0.0.1:9092 #zookeeper配置地址
#      client-id: wi_datacenter_processor #自定義客戶端,併發設定此項會造成異常Error registering AppInfo mbean
      listener:
        #ack-mode: batch # 批量提交Offset
        #ack-time:
#        poll-timeout: 5000 #pool Time
      consumer:
        group-id: lt_datacenter_processor_lenovo
        auto-offset-reset: earliest
        auto-commit-interval: 1000
        enable-auto-commit: true #手動提交Offset
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 5 # 每次返回100條資料
      producer:
        retries: 0
        batch-size: 16384
        buffer-memory: 33554432
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

import cn.litsoft.configuration.SpringContextUtil;
import cn.litsoft.configuration.kafka.KafkaConsumerConfiguration;
import cn.litsoft.iot.ailog.constant.ConstantGlobal;
import cn.litsoft.iot.common.utils.thread.ThreadUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;

/**
 * trs資料消費kafka
 */
@Component
@ConditionalOnProperty(name = "media.kafka.consumer.enabled", havingValue = "true") // 開啟註解才會啟動
public class KafaConsumerManual implements CommandLineRunner {
    public static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfiguration.class);

    public KafaConsumerManual() {
        log.info("初始化:LenovoKafKaContentConsumer");
    }


    @Autowired
    private KafkaProperties kafkaProperties;

    public KafkaConsumer kafkaConsumer() {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, ConstantGlobal.KAFKA_TO_ES_CONSUMER_GROUP_ID);
        return new KafkaConsumer(props);
    }

    /**
     * 簡短任務使用預設執行緒池處理
     */
    @Resource(name = "manualTaskExecutor")
    private ThreadPoolTaskExecutor taskExecutor;

    @Override
    public void run(String... strings) {

        KafkaConsumer consumer = kafkaConsumer();
        taskExecutor.execute(new ThreadRunnable(consumer));// 丟 到執行緒池裡面去執行
    }


    public class ThreadRunnable implements Runnable {

        private KafkaConsumer consumer;

        public ThreadRunnable(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {

            // 訂閱主題
            consumer.subscribe(Arrays.asList("lenovo"));
            ThreadPoolTaskExecutor taskExecutor = SpringContextUtil.getBean("lkTaskExecutor");//執行緒池子

            //死迴圈不停的從broker中拿資料
            ConsumerRecords<String, String> list = null;
            while (true) {
                list = consumer.poll(100);

                if (null == list || list.count() == 0) {
                    log.info("業務【KafaConsumerManual】取出資料為空,休息一段時間進行等待");
                    ThreadUtil.sleep(60 * 1000);
                    continue;
                }

                log.info("業務【KafaConsumerManual】取出資料[{}]條", list.count());

                final CountDownLatch countDownLatch = new CountDownLatch(list.count());

                list.forEach(record -> taskExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                        if (kafkaMessage.isPresent()) {
                            try {
                                System.out.println("獲取到kafka訊息   :"+kafkaMessage.get().toString());
//                                HashMap hashMap = (HashMap) JsonHelper.jsonToMap(kafkaMessage.get().toString());

                            } catch (Exception ex) {
                                ThreadUtil.sleep(1000);//暫停一會,防止異常資訊刷屏
                                log.error("[KafaConsumerManual]發生異常:[{}]", ex);
                            } finally {
                                countDownLatch.countDown();
                            }

                        } else {
                            countDownLatch.countDown();
                        }
                    }
                }));

                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    consumer.commitAsync();
                }
            }

        }
    }
}