1. 程式人生 > >Kafka consumer在項目中的多線程處理方式

Kafka consumer在項目中的多線程處理方式

-m all ade exec 線程處理 rop while log 安全

對於KafkaConsumer而言,它不像KafkaProducer,不是線程安全的,狀態是在consumer中維護的,所以實現時要註意多線程的使用,一般有2種使用方法: 1:每個Consumer有自己的線程,consumer去拉取數據,並對數據處理,這種方式比較簡單,易於實現,容易順序處理消息 2:消費者處理者方式,創建一個線程池,在consumer拉取數據後,由線程池來中的線程來處理數據,把拉取數據與處理數據解耦,但數據處理有可能破壞partition的消息順序 從Kafka 文檔中我們也可以查到有關consumer多線程的處理方式 技術分享圖片

項目實踐:

下圖是項目中對consumer的具體應用,雖然也使用了線程池,但其實還是上述第一種方式,線程池在此只是用於啟動consumer的運行: 技術分享圖片

描述:

ConsumerGroup類:這對應於消費組,在上面第1步將創建一個監聽對象,其將被傳入到ConsumerGroup對象的創建過程中,在cg中將創建一個RunnableConsumer的對象列表(list),也就是上圖第3步,列表中的consumer對象的數量將對應所期望的在Group組中consumer的數量。同時創建一個線程池對象executor,此處線程池的數量和consumer的數量一致

RunnableConsumer類:這是一個線程類,實現了Runnable接口,裏面創建了一個KafkaConsumer對象,線程啟動程序中執行對topic的訂閱,並拉取消息

public
class RunnableConsumer<K,V> implements Runnable { private Consumer<K,V> consumer; private final IConsumerListener<ConsumerRecords<K,V>> listener; private RunnableConsumer(final IConsumerListener<ConsumerRecords<K,V>> listener, Properties... props) {
this.consumer = new KafkaConsumer<>(props, keyDeserClass, valueDeserClass); this.listener = listener; } public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords<K,V> records = null; try { //now handle any new record(s) records = consumer.poll(1000); if(records != null && records.count() > 0) { listener.notify(records); } } catch(WakeupException wex) { LOGGER.trace("Got a WakeupException. Doing nothing. Exception Details:",wex); } } } catch (Throwable e) { // ignore as we are waking up from the poll, we need to cleanly shutdown our consumer LOGGER.error("getting non-recoverable throwable: ", e); throw e; } finally { //TODO: need to check on consumer closing that any outstanding offset commit is done. //otherwise we need to manually do it here. processCommit(SyncMode.SYNC); LOGGER.info("Trying to close Kafka consumer, ConsumerGroup.isRunning: {}", ConsumerGroup.this.isRunning); consumer.close(); } } }

Listener類:這是一個監聽類,用於實際處理某一topic消息,先創建監聽對象,在創建cg時,註冊到RunnerConsumer類中,如果consumer拉取到消息,則將消息通知監聽類去具體處理,不同的業務需要定義不同的業務監聽類

修改為第二種方式

如果想使用第二種方式,將數據的處理從consumer中解耦出來,可以將上面的listener修改為一個線程類,在consumer中有拉取到消息,則從線程池中取出線程處理數據,這種方式的一個最大的問題,就是如何保證消息是按順序處理的,例如,如果一個partition中先後有2條消息,當consumer poll到消息後,將提交到2個線程處理,這就無法保證順序處理,需要額外的線程同步處理機制。同時因為不需要在consumer中對數據進行處理,consumer的性能也提高了,而且避免了數據處理超時,consumer Rebalance等潛在問題

records = consumer.poll(1000);
if(records != null && records.count() > 0) {
       executor.submit(new listener(records));
}

參考:

http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded

https://howtoprogram.xyz/2016/05/29/create-multi-threaded-apache-kafka-consumer/

Kafka consumer在項目中的多線程處理方式