1. 程式人生 > >kafka消費者監聽資料原理

kafka消費者監聽資料原理

kafka確實是一個很牛逼的訊息中介軟體。基本上是訊息中介軟體中資料最快吞吐量最高的分散式訊息中介軟體了。
由於公司對kafka全封裝了,直接呼叫api就可以了。但是本人對kafka很感興趣,就先看了下kafka監聽topic裡的新增的訊息。
看了下原始碼其實很簡單。

public class Consumer{

    private static final KafkaConsumer<String, String> consumer;
    private static ExecutorService executors;

// 消費者從partition裡取資料時,需指定的一系列引數
public ConsumerHandler(ConsumerProperty consumerProperty, List<String> topics) { Properties props = new Properties(); props.put("bootstrap.servers", consumerProperty.getBrokerList()); props.put("group.id", consumerProperty.getGroupId()); props.put("enable.auto.commit"
, consumerProperty.getEnableAutoCommit()); props.put("auto.commit.interval.ms", consumerProperty.getAutoCommitInterval()); props.put("session.timeout.ms", consumerProperty.getSessionTimeout()); props.put("key.deserializer", consumerProperty.getKeySerializer()); props.put("value.deserializer"
, consumerProperty.getValueSerializer()); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(topics); } public void execute(int threads) { executors = new ThreadPoolExecutor(threads, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy()); Thread t = new Thread(new Runnable(){//啟動一個子執行緒來監聽kafka訊息 public void run(){ while (true) { //採用迴圈不斷從partition裡撈資料 ConsumerRecords<String, String> records = consumer.poll(200); for (final ConsumerRecord record : records) { System.out.println("監聽到kafka訊息。。。。。。"); executors.submit(new ConsumerWorker(record)); } } }}); t.start(); } public void shutdown() { if (consumer != null) { consumer.close(); } if (executors != null) { executors.shutdown(); } try { if (!executors.awaitTermination(10, TimeUnit.SECONDS)) { System.out.println("Timeout.... Ignore for this case"); } } catch (InterruptedException ignored) { System.out.println("Other thread interrupted this shutdown, ignore for this case."); Thread.currentThread().interrupt(); } } }

看了原始碼得知:開啟一個執行緒池ThreadPoolExecutor,for迴圈建立一個長連線,每200毫秒去kafka伺服器拉取訊息,每拉到一個訊息,就分配給一個執行緒類ConsumerWorker去處理這個訊息