Kafka多個消費者的小例子
阿新 • • 發佈:2020-11-29
Kafka多個消費者的小例子
public class FirstMultiConsumerThreadDemo2 { public static final String brokerList = "10.211.55.3:9092"; public static final String topic = "topic-demo"; public static final String groupId = "group.demo"; public static Properties initConfig() { Properties props = new Properties(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // props.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer.client.id.demo"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); return props; } public static void main(String[] args) { Properties props = initConfig(); KafkaConsumerThread consumerThread = new KafkaConsumerThread(props,topic,Runtime.getRuntime().availableProcessors()); consumerThread.start(); } public static class KafkaConsumerThread extends Thread { private KafkaConsumer<String,String> kafkaConsumer; private ExecutorService executorService; private int threadNumber; public KafkaConsumerThread(Properties props,String topic,int threadNumber) { this.kafkaConsumer = new KafkaConsumer<>(props); this.kafkaConsumer.subscribe(Collections.singletonList(topic)); this.threadNumber = threadNumber; executorService = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy()); } @Override public void run() { try { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); if (!records.isEmpty()) { //將一批訊息,即records封裝成任務類,提交給執行緒池 //通常這一步最為耗時,通過非同步的方式,降低處理業務邏輯所耗費的時間 executorService.submit(new RecordHandler(records)); } } } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } } public static class RecordHandler extends Thread{ public final ConsumerRecords<String,String> records; public RecordHandler(ConsumerRecords<String,String> records){ this.records = records; } @Override public void run() { //處理records } } }
RecordHandler
類是用來處理訊息的,注意執行緒池的最後一個引數設定的是:CallerrunsPolicy
,這樣可以防止執行緒池的總體消費能力跟不上poll()的能力,從而導致異常現象的發生。
一般而言,poll()
拉取訊息的速度是相當快的,而整體消費的瓶頸也正是在處理訊息這一塊,如果我們通過非同步的方式,就能 帶動整體消費效能的提升。
如下圖:
將處理訊息改成多執行緒的實現方式。