springboot + @KafkaListener 手動提交及消費能力優化
阿新 • • 發佈:2018-11-19
public class KonkaKafkaListener { private final static Logger LOGGER = LoggerFactory.getLogger(KonkaKafkaListener.class); @Autowired private RouterService routerService; @KafkaListener(containerFactory = "kafkaListenerContainerFactory", topics = "test") public void consumerListener(List<ConsumerRecord> consumerRecords, Acknowledgment ack) { ack.acknowledge();//直接提交offset if (consumerRecords.size() > 0) { PartitionCounter.addCounter(consumerRecords.get(0).partition(), consumerRecords.size()); } Iterator<ConsumerRecord> iterator = consumerRecords.iterator(); while (iterator.hasNext()) { ConsumerRecord consumerRecord = iterator.next(); String key = consumerRecord.key().toString(); KafkaLogMessage kafkaLogMessage = (KafkaLogMessage) consumerRecord.value(); if (kafkaLogMessage == null) { continue; } routerService.handleKafkaMessage(key, kafkaLogMessage); } }
#消費者併發啟動個數(對應分割槽個數)每個listener方法 kafka.concurrency=10
將啟動器的併發提高到和分割槽數一致
kafka 消費能力的提高
1、自動提交的實現
2、autoCommitIntervalMs 設定每次隔多久自動提交offset
3、kafka.max.poll.interval.ms 和 sessionTimeout
max.poll.interval.ms ,它表示最大的poll資料間隔,如果超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認為該consumer處於 livelock狀態。就會將該consumer退出consumer group
之後就會觸發導致reblance
·heartbeat.interval.ms
心跳間隔。心跳是在consumer與coordinator之間進行的。心跳是確定consumer存活,加入或者退出group的有效手段。
這個值必須設定的小於session.timeout.ms,因為:
當Consumer由於某種原因不能發Heartbeat到coordinator時,並且時間超過session.timeout.ms時,就會認為該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。
通常設定的值要低於session.timeout.ms的1/3。
預設值是:3000 (3s)
·session.timeout.ms
Consumer session 過期時間。這個值必須設定在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。
其預設值是:10000 (10 s)