spirngboot 配置kafka實現group訂閱訊息
本人所使用的kafka版本是kafka_2.11-2.0.1版本,jdk1.8、zookeeper-3.3.6,kafka運行於JVM環境,依賴zookeeper註冊中心,所以需要準備這三個軟體,本文測試使用的是windows版本的kafka和zookeeper便於測試
環境搭建 步驟
1、搭建jdk環境,配置環境變數(省略,請自行百度搜索)
2、zookeeper環境配置,修改zookeeper配置檔案
將zoo_sample.cfg 修改為zoo.cfg檔案,修改zookeeper的訪問埠
啟動zookeeper,在bin目錄下zkServer.cmd雙擊即可
3.1 下載安裝檔案: http://kafka.apache.org/downloads.html
3.2 解壓檔案(本文解壓到 E:\worksoft\kafka_2.11-2.0.1)
3.3 開啟E:\worksoft\kafka_2.11-2.0.1\config
3.4 從文字編輯器裡開啟 server.properties
3.5 把 log.dirs的值改成 “E:\worksoft\kafka_2.11-2.0.1\kafka-logs”
3.6 開啟cmd
3.7 進入kafka檔案目錄: cd /d E:\worksoft\kafka_2.11-2.0.1\
3.8 輸入並執行以開啟kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
4. 建立topics
4.1 開啟cmd 並進入E:\worksoft\kafka_2.11-2.0.1\bin\windows
4.2 建立一個topic:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
5. 開啟一個Producer:
cd /d E:\worksoft\kafka_2.11-2.0.1\bin\windows
kafka-console-producer.bat --broker-list localhost:9092 --topic test
6. 開啟一個Consumer:
cd /d E:\worksoft\kafka_2.11-2.0.1\bin\windows
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
檢視kafka中的所有組
./kafka-consumer-groups.bat --bootstrap-server 127.0.0.1:9092 --list
檢視某個consumer組的消費堆積量
kafka-consumer-groups.bat --bootstrap-server 127.0.0.1:9092 --group lt_datacenter_processor_kafka_to_len_consumer --describe
檢視kafka某個topic下partition資訊
kafka-topics.bat --zookeeper 127.0.0.1:2181 --topic lenovo --describe
springboot整合kafka。通過在啟動類上開啟註解方式啟動
pom檔案裡面新增kafka依賴jar包
<!-- 提供kafka核心工具包 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency> <!-- 提供AdminClient管理 AdminClientConfig--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.1</version> </dependency>
/** * 自動啟用 Kafka */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(KafkaConsumerConfiguration.class) @Documented public @interface EnableKafkaConsumer { }
@EnableKafka public class KafkaConsumerConfiguration { public static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfiguration.class); /** * 引用kafkaProperties屬性類 */ @Resource private KafkaProperties kafkaProperties; /** * 消費者執行緒使用自定義執行緒池 * * @return */ @Bean(name = "consumerTaskExecutor") public ThreadPoolTaskExecutor consumerTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //執行緒池活躍的執行緒數 executor.setCorePoolSize(20); //executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); //執行緒池最大活躍的執行緒數 executor.setMaxPoolSize(200); //執行緒名稱字首 executor.setThreadNamePrefix("kafkaThread-C-"); //執行緒池維護執行緒所允許的空閒時間 executor.setKeepAliveSeconds(30 * 60); //核心執行緒池也會請0 executor.setAllowCoreThreadTimeOut(true); //執行緒池拒絕機制,拋棄最久的執行緒 // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //executor.setWaitForTasksToCompleteOnShutdown(true); //執行緒池佇列 executor.setQueueCapacity(10000); executor.initialize(); return executor; } /** * 重寫設定自定義執行緒池 * * @param <K> * @param <V> */ public class CustomConcurrentKafkaListenerContainerFactory<K, V> extends ConcurrentKafkaListenerContainerFactory<K, V> { /** * The executor for threads that poll the consumer. */ private AsyncListenableTaskExecutor consumerTaskExecutor; public CustomConcurrentKafkaListenerContainerFactory(AsyncListenableTaskExecutor consumerTaskExecutor) { this.consumerTaskExecutor = consumerTaskExecutor; } @Override protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) { super.initializeContainer(instance); instance.getContainerProperties().setConsumerTaskExecutor(consumerTaskExecutor); } } /** * 注入消費者監聽器 * * @return */ @Bean public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new CustomConcurrentKafkaListenerContainerFactory<Object, Object>(consumerTaskExecutor()); if (null != kafkaProperties.getListener().getConcurrency()) { factory.setConcurrency(kafkaProperties.getListener().getConcurrency()); } factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); return factory; } /** * 注入消費者工廠 * * @return */ @Bean public ConsumerFactory<Object, Object> consumerFactory() { return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); } /** * 注入消費者 * * @return */ @Bean public KafkaConsumer<Object, Object> consumer() { log.info("<<<<<<<<<<<<<<< 載入 KafkaConsumer 服務 >>>>>>>>>>>>>>>>>>"); return new KafkaConsumer<>(kafkaProperties.buildConsumerProperties()); } /** * Admin管理配置 * * @return */ @Bean public KafkaAdmin kafkaAdmin() { log.info("<<<<<<<<<<<<<<< 載入 KafkaAdmin 服務 >>>>>>>>>>>>>>>>>>"); Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); // 使用內建的Kafka //configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses())); return new KafkaAdmin(configs); } }
@Component public class KafkaConsumerUtils { private static KafkaConsumer consumer; public KafkaConsumerUtils() { } @Autowired public KafkaConsumerUtils(KafkaConsumer kafkaConsumer) { KafkaConsumerUtils.consumer = kafkaConsumer; System.out.println("kafka消費者初始化完成..."); } private static volatile KafkaConsumerUtils instance; /** * 單例模式,執行緒安全 * * @return * @throws Exception */ public static KafkaConsumerUtils getIstance() { // 物件例項化時與否判斷(不使用同步程式碼塊,instance不等於null時,直接返回物件,提高執行效率) if (instance == null) { //同步程式碼塊(物件未初始化時,使用同步程式碼塊,保證多執行緒訪問時物件在第一次建立後,不再重複被建立) synchronized (KafkaConsumerUtils.class) { //未初始化,則初始instance變數 if (instance == null) { instance = new KafkaConsumerUtils(); //instance.afterPropertySet();//初始化配置和Pool池 } } } return instance; } }
spring: kafka: bootstrap-servers: 127.0.0.1:9092 #zookeeper配置地址 # client-id: wi_datacenter_processor #自定義客戶端,併發設定此項會造成異常Error registering AppInfo mbean listener: #ack-mode: batch # 批量提交Offset #ack-time: # poll-timeout: 5000 #pool Time consumer: group-id: lt_datacenter_processor_lenovo auto-offset-reset: earliest auto-commit-interval: 1000 enable-auto-commit: true #手動提交Offset key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 5 # 每次返回100條資料 producer: retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
import cn.litsoft.configuration.SpringContextUtil; import cn.litsoft.configuration.kafka.KafkaConsumerConfiguration; import cn.litsoft.iot.ailog.constant.ConstantGlobal; import cn.litsoft.iot.common.utils.thread.ThreadUtil; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Arrays; import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; /** * trs資料消費kafka */ @Component @ConditionalOnProperty(name = "media.kafka.consumer.enabled", havingValue = "true") // 開啟註解才會啟動 public class KafaConsumerManual implements CommandLineRunner { public static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfiguration.class); public KafaConsumerManual() { log.info("初始化:LenovoKafKaContentConsumer"); } @Autowired private KafkaProperties kafkaProperties; public KafkaConsumer kafkaConsumer() { Map<String, Object> props = kafkaProperties.buildConsumerProperties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, ConstantGlobal.KAFKA_TO_ES_CONSUMER_GROUP_ID); return new KafkaConsumer(props); } /** * 簡短任務使用預設執行緒池處理 */ @Resource(name = "manualTaskExecutor") private ThreadPoolTaskExecutor taskExecutor; @Override public void run(String... strings) { KafkaConsumer consumer = kafkaConsumer(); taskExecutor.execute(new ThreadRunnable(consumer));// 丟 到執行緒池裡面去執行 } public class ThreadRunnable implements Runnable { private KafkaConsumer consumer; public ThreadRunnable(KafkaConsumer consumer) { this.consumer = consumer; } @Override public void run() { // 訂閱主題 consumer.subscribe(Arrays.asList("lenovo")); ThreadPoolTaskExecutor taskExecutor = SpringContextUtil.getBean("lkTaskExecutor");//執行緒池子 //死迴圈不停的從broker中拿資料 ConsumerRecords<String, String> list = null; while (true) { list = consumer.poll(100); if (null == list || list.count() == 0) { log.info("業務【KafaConsumerManual】取出資料為空,休息一段時間進行等待"); ThreadUtil.sleep(60 * 1000); continue; } log.info("業務【KafaConsumerManual】取出資料[{}]條", list.count()); final CountDownLatch countDownLatch = new CountDownLatch(list.count()); list.forEach(record -> taskExecutor.execute(new Runnable() { @Override public void run() { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { try { System.out.println("獲取到kafka訊息 :"+kafkaMessage.get().toString()); // HashMap hashMap = (HashMap) JsonHelper.jsonToMap(kafkaMessage.get().toString()); } catch (Exception ex) { ThreadUtil.sleep(1000);//暫停一會,防止異常資訊刷屏 log.error("[KafaConsumerManual]發生異常:[{}]", ex); } finally { countDownLatch.countDown(); } } else { countDownLatch.countDown(); } } })); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.commitAsync(); } } } } }