1. 程式人生 > 實用技巧 >史上最差的kafka教程第五天(kafka避免重複消費與避免異常處理死迴圈)

史上最差的kafka教程第五天(kafka避免重複消費與避免異常處理死迴圈)

kafka避免重複消費實際上是修改offset的提交方式。但是如果前一段消費失敗通常是採取繼續傳送到該topic下,但是呼叫的服務已經掛了,如果

服務一直處於異常意味著,要不停的重複失敗的資料。會變成死迴圈,不停報錯,浪費系統資源。

解決方案:將處理失敗的資訊傳送到指定topic底下,該topic監聽器採用定時器開關,每天定時去讀取操作失敗的資料。

具體操作:

  1. 設定監聽器啟動方式或者不設定
  2. 設定需要設定定時開關監聽器的id
  3. 利用定時器定時開關指定監聽器
@Component
public class TaskListener{

    private static final Logger log= LoggerFactory.getLogger(TaskListener.class
); @Autowired private KafkaListenerEndpointRegistry registry; @Autowired private ConsumerFactory consumerFactory;
//如果在kafkaconfiguration裡面有設定過的話,不需要再設定 @Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() { ConcurrentKafkaListenerContainerFactory container
= new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(consumerFactory); //禁止自動啟動,這個是全域性監聽器的。可以根據具體業務設定 container.setAutoStartup(false); return container; } @KafkaListener(id = "durable", topics = "topic.durable",containerFactory = "delayContainerFactory")
public void durableListener(String data) { //這裡做資料持久化的操作 log.info("topic.quick.durable receive : " + data); }
}

@Component
@EnableScheduling
@Slf4j
public class TaskDemo {

    @Autowired
    private KafkaListenerEndpointRegistry registry;


    @Scheduled(cron = "0/30 * * * * ?")
    public void startRun(){
        //判斷監聽容器是否啟動,未啟動則將其啟動
        if (!registry.getListenerContainer("durable").isRunning()) {
            log.info("開啟監聽器");
            registry.getListenerContainer("durable").start();
        }
    }


    @Scheduled(cron = "0/10 * * * * ?")
    public void stopRun(){
        //判斷監聽容器是否啟動,未啟動則將其啟動
        if (registry.getListenerContainer("durable").isRunning()) {
            log.info("關閉監聽器");
            registry.getListenerContainer("durable").stop();
        }
      
    }