1. 程式人生 > 其它 >kafka消費者配置

kafka消費者配置

最近的業務接觸了一下訊息外掛kafka ,其他客戶端負責傳送,我方負責接收消費。功能很簡單,但是!!!!我們要求把訊息入庫,而且資料量每天達到了千萬級別,這就日了苟啊!

廢話不多說,上程式碼!

由於訊息需要入庫,我使用的是手動提交訊息,如果不入庫,不要求準確定,使用自動提交就ok

消費者配置類

package com.asiainfo.hsop.server.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;

import java.util.HashMap;
import java.util.Map;
/**
 * kafka訊息佇列消費者 配置類
 *
 * @author cyc
 * @version 1.0.0
 * @date 2021-07-02
 *
 * */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    /*
    *
    *啟動專案之前 需要修改 etc/hosts 檔案,把 kafka例項名稱和主機ip對應關係填好,否則會報錯 ,
    *
    *報錯資訊如下:
    *           Discovered group coordinator......
    *           (Re-)joining group......
    *           Marking the coordinator......
    *嘗試升級kafka-clients ,但是沒有啥卵用.......最好的解決方案 就是修改系統hosts檔案
    *
    *   主機ip 和 例項名稱 對應關係
    *   11.251.10.111 sss-nn-01
    *   
    *
    *  */
    public Map<String, Object> consumerConfig(String consumerGroupId) {
        Map<String, Object> props = new HashMap<>();
        //kafka服務地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.253.6.194:6667,10.253.6.195:6667,10.253.6.196:6667,10.253.6.197:6667,10.253.6.198:6667,10.253.6.199:6667,10.253.6.200:6667,10.253.6.201:6667");
        //消費後是否自動提交 當前為false
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //獲取訊息後提交偏移量的最大時間
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        //超時時間,服務端沒有收到心跳就會認為當前消費者失效
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //預設消費組
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        //earliest  從頭開始消費、latest獲取最新訊息 、none 沒理解
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    public ConsumerFactory<String, String> consumerFactory(String consumerGroupId) {
        return new DefaultKafkaConsumerFactory<>(consumerConfig(consumerGroupId));
    }

    /*有多個消費組監聽,直接複製改方法,
    * 修改 @Bean(name = "kafkaListenerContainerFactory") name重新命名
    *
    * factory.setConsumerFactory(consumerFactory("etctu_tradeinfolist_test")); 改下消費組 名稱
    *
    * 在呼叫的時候containerFactory="kafkaListenerContainerFactory" 改一下就ok
    *
    * */
    @Bean(name = "kafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("etctu_tradeinfolist_test"));
     //監聽的執行緒數量,多個執行緒入庫,資料庫的id是自增的話,可能導致死鎖,建議使用UUID factory.setConcurrency(1); /*消費者有兩種消費模式,一種是kafka例項主動推送push模式,推送速度由kafka決定,很有可能導致消費端阻塞, *另一種就是 消費者主動拉取,poll模式 * */ factory.getContainerProperties().setPollTimeout(3000); //當使用手動提交時必須設定ackMode為MANUAL,否則會報錯No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment. // factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); // 下面兩個條件哪個先滿足,就會先使用那個 factory.getContainerProperties().setAckCount(10);//達到10條資料的時候提交一次 factory.getContainerProperties().setAckTime(10000);//10s提交一次 return factory; } @Bean(name = "kafkaListenerContainerFactory2") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory("exit_toll_host_test")); factory.setConcurrency(1); factory.getContainerProperties().setPollTimeout(3000); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); factory.getContainerProperties().setAckCount(10); factory.getContainerProperties().setAckTime(10000); return factory; } @Bean(name = "kafkaListenerContainerFactory3") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory3() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory("entryinfo_test")); factory.setConcurrency(1); factory.getContainerProperties().setPollTimeout(3000); //當使用手動提交時必須設定ackMode為MANUAL,否則會報錯No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment. factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); factory.getContainerProperties().setAckCount(10); factory.getContainerProperties().setAckTime(10000); return factory; } }

 

消費者類,裡面包含了Hbase 引入的包,沒啥用,刪掉就行

package com.asiainfo.hsop.server.kafka;

import com.alibaba.fastjson.JSON;
import com.asiainfo.hsop.base.po.data.kafka.HospKafkaDetail;

import com.asiainfo.hsop.common.utils.HBaseUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.math.BigDecimal;
import java.text.ParseException;
import java.util.*;

/**
 *
 * @author cyc
 *
 * @date 2021-07-02
 *
 *
 *
 *
 * */
@Component
public class Consumer {

    private Logger logger= LoggerFactory.getLogger(Consumer.class);




        @KafkaListener(topics = "highwayetctu",containerFactory="kafkaListenerContainerFactory")
        public void consumer1(ConsumerRecord<?, ?> record, Acknowledgment ack) throws InterruptedException, ParseException, IOException {

            //具體業務

        //手動提交 ack.acknowledge(); } @KafkaListener(topics = "exitList_0706",containerFactory="kafkaListenerContainerFactory2") public void consumer2(ConsumerRecord<?, ?> record,Acknowledgment ack ) throws IOException, ParseException {         //具體業務
        //手動提交 ack.acknowledge(); } @KafkaListener(topics = "entryinfo",containerFactory="kafkaListenerContainerFactory3") public void consumer3(ConsumerRecord<?, ?> record,Acknowledgment ack ) throws IOException, ParseException {   //具體業務
      
        //手動提交 ack.acknowledge(); } }