Kafka consumer在項目中的多線程處理方式
阿新 • • 發佈:2018-07-02
-m all ade exec 線程處理 rop while log 安全 對於KafkaConsumer而言,它不像KafkaProducer,不是線程安全的,狀態是在consumer中維護的,所以實現時要註意多線程的使用,一般有2種使用方法:
1:每個Consumer有自己的線程,consumer去拉取數據,並對數據處理,這種方式比較簡單,易於實現,容易順序處理消息
2:消費者處理者方式,創建一個線程池,在consumer拉取數據後,由線程池來中的線程來處理數據,把拉取數據與處理數據解耦,但數據處理有可能破壞partition的消息順序
從Kafka 文檔中我們也可以查到有關consumer多線程的處理方式
項目實踐:
下圖是項目中對consumer的具體應用,雖然也使用了線程池,但其實還是上述第一種方式,線程池在此只是用於啟動consumer的運行:描述:
ConsumerGroup類:這對應於消費組,在上面第1步將創建一個監聽對象,其將被傳入到ConsumerGroup對象的創建過程中,在cg中將創建一個RunnableConsumer的對象列表(list),也就是上圖第3步,列表中的consumer對象的數量將對應所期望的在Group組中consumer的數量。同時創建一個線程池對象executor,此處線程池的數量和consumer的數量一致RunnableConsumer類:這是一個線程類,實現了Runnable接口,裏面創建了一個KafkaConsumer對象,線程啟動程序中執行對topic的訂閱,並拉取消息
publicclass RunnableConsumer<K,V> implements Runnable { private Consumer<K,V> consumer; private final IConsumerListener<ConsumerRecords<K,V>> listener; private RunnableConsumer(final IConsumerListener<ConsumerRecords<K,V>> listener, Properties... props) {this.consumer = new KafkaConsumer<>(props, keyDeserClass, valueDeserClass); this.listener = listener; } public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords<K,V> records = null; try { //now handle any new record(s) records = consumer.poll(1000); if(records != null && records.count() > 0) { listener.notify(records); } } catch(WakeupException wex) { LOGGER.trace("Got a WakeupException. Doing nothing. Exception Details:",wex); } } } catch (Throwable e) { // ignore as we are waking up from the poll, we need to cleanly shutdown our consumer LOGGER.error("getting non-recoverable throwable: ", e); throw e; } finally { //TODO: need to check on consumer closing that any outstanding offset commit is done. //otherwise we need to manually do it here. processCommit(SyncMode.SYNC); LOGGER.info("Trying to close Kafka consumer, ConsumerGroup.isRunning: {}", ConsumerGroup.this.isRunning); consumer.close(); } } }
Listener類:這是一個監聽類,用於實際處理某一topic消息,先創建監聽對象,在創建cg時,註冊到RunnerConsumer類中,如果consumer拉取到消息,則將消息通知監聽類去具體處理,不同的業務需要定義不同的業務監聽類
修改為第二種方式
如果想使用第二種方式,將數據的處理從consumer中解耦出來,可以將上面的listener修改為一個線程類,在consumer中有拉取到消息,則從線程池中取出線程處理數據,這種方式的一個最大的問題,就是如何保證消息是按順序處理的,例如,如果一個partition中先後有2條消息,當consumer poll到消息後,將提交到2個線程處理,這就無法保證順序處理,需要額外的線程同步處理機制。同時因為不需要在consumer中對數據進行處理,consumer的性能也提高了,而且避免了數據處理超時,consumer Rebalance等潛在問題
records = consumer.poll(1000); if(records != null && records.count() > 0) { executor.submit(new listener(records)); }
參考:
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded
https://howtoprogram.xyz/2016/05/29/create-multi-threaded-apache-kafka-consumer/
Kafka consumer在項目中的多線程處理方式