史上最差的kafka教程第五天(kafka避免重複消費與避免異常處理死迴圈)
阿新 • • 發佈:2020-08-13
kafka避免重複消費實際上是修改offset的提交方式。但是如果前一段消費失敗通常是採取繼續傳送到該topic下,但是呼叫的服務已經掛了,如果
服務一直處於異常意味著,要不停的重複失敗的資料。會變成死迴圈,不停報錯,浪費系統資源。
解決方案:將處理失敗的資訊傳送到指定topic底下,該topic監聽器採用定時器開關,每天定時去讀取操作失敗的資料。
具體操作:
- 設定監聽器啟動方式或者不設定
- 設定需要設定定時開關監聽器的id
- 利用定時器定時開關指定監聽器
@Component public class TaskListener{ private static final Logger log= LoggerFactory.getLogger(TaskListener.class); @Autowired private KafkaListenerEndpointRegistry registry; @Autowired private ConsumerFactory consumerFactory;
//如果在kafkaconfiguration裡面有設定過的話,不需要再設定 @Bean public ConcurrentKafkaListenerContainerFactory delayContainerFactory() { ConcurrentKafkaListenerContainerFactory container= new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(consumerFactory); //禁止自動啟動,這個是全域性監聽器的。可以根據具體業務設定 container.setAutoStartup(false); return container; } @KafkaListener(id = "durable", topics = "topic.durable",containerFactory = "delayContainerFactory")public void durableListener(String data) { //這裡做資料持久化的操作 log.info("topic.quick.durable receive : " + data); }
}
@Component @EnableScheduling @Slf4j public class TaskDemo { @Autowired private KafkaListenerEndpointRegistry registry; @Scheduled(cron = "0/30 * * * * ?") public void startRun(){ //判斷監聽容器是否啟動,未啟動則將其啟動 if (!registry.getListenerContainer("durable").isRunning()) { log.info("開啟監聽器"); registry.getListenerContainer("durable").start(); } } @Scheduled(cron = "0/10 * * * * ?") public void stopRun(){ //判斷監聽容器是否啟動,未啟動則將其啟動 if (registry.getListenerContainer("durable").isRunning()) { log.info("關閉監聽器"); registry.getListenerContainer("durable").stop(); } }