1. 程式人生 > 實用技巧 >Kafka多個消費者的小例子

Kafka多個消費者的小例子

Kafka多個消費者的小例子

public class FirstMultiConsumerThreadDemo2 {
    public static final String brokerList = "10.211.55.3:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//        props.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer.client.id.demo");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumerThread consumerThread = new KafkaConsumerThread(props,topic,Runtime.getRuntime().availableProcessors());
        consumerThread.start();
    }

    public static class KafkaConsumerThread extends Thread {
        private KafkaConsumer<String,String> kafkaConsumer;
        private ExecutorService executorService;
        private int threadNumber;

        public KafkaConsumerThread(Properties props,String topic,int threadNumber) {
            this.kafkaConsumer = new KafkaConsumer<>(props);
            this.kafkaConsumer.subscribe(Collections.singletonList(topic));
            this.threadNumber = threadNumber;
            executorService = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                    if (!records.isEmpty()) {
                        //將一批訊息,即records封裝成任務類,提交給執行緒池
                        //通常這一步最為耗時,通過非同步的方式,降低處理業務邏輯所耗費的時間
                        executorService.submit(new RecordHandler(records));
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                kafkaConsumer.close();
            }
        }
    }

    public static class RecordHandler extends Thread{
        public final ConsumerRecords<String,String> records;
        public RecordHandler(ConsumerRecords<String,String> records){
            this.records = records;
        }

        @Override
        public void run() {
            //處理records
        }
    }


}

RecordHandler類是用來處理訊息的,注意執行緒池的最後一個引數設定的是:CallerrunsPolicy,這樣可以防止執行緒池的總體消費能力跟不上poll()的能力,從而導致異常現象的發生。

一般而言,poll()拉取訊息的速度是相當快的,而整體消費的瓶頸也正是在處理訊息這一塊,如果我們通過非同步的方式,就能 帶動整體消費效能的提升。

如下圖:

將處理訊息改成多執行緒的實現方式。