1. 程式人生 > 實用技巧 >spring kafka之如何批量給topic加字首

spring kafka之如何批量給topic加字首

最近業務開發部門給我們部門提了一個需求,因為他們開發環境和測試環境共用一套kafka,他們希望我們部門能幫他們實現自動給kafka的topic加上環境字首,比如開發環境,則topic為dev_topic,測試環境,則topic為test_topic,他們kafka客戶端是使用spring-kafka。一開始接到這個需求的時候,我心裡是拒絕的,為啥開發環境和測試環境不分別部署一套kafka,還要那麼麻煩。但老大都答應接這個需求了,作為小羅羅也只能接了

實現思路

1、生產者端

可以通過生產者攔截器,來給topic加字首

2、實現步驟

a、編寫一個生產者攔截器

@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> {



    /**
     * 執行在使用者主執行緒中,在訊息被序列化之前呼叫
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) {
        log.info("原始topic:{}",record.topic());
        return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(),
                record.partition(),record.timestamp(),record.key(), record.value());
    }




    /**
     * 在訊息被應答之前或者訊息傳送失敗時呼叫,通常在producer回撥邏輯觸發之前,執行在produer的io執行緒中
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
      log.info("實際topic:{}",metadata.topic());
    }


    /**
     *  清理工作
     */
    @Override
    public void close() {
    }


    /**
     * 初始化工作
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }

b、配置攔截器

kafka:
    producer:
      # 生產者攔截器配置
      properties:
        interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor

c、測試

2、消費者端

這個就稍微有點難搞了,因為業務開發部門他們是直接用@KafkaListener的註解,形如下

 @KafkaListener(id = "msgId",topics = {Constant.TOPIC})

像這種也沒啥好的辦法,就只能通過原始碼了,通過原始碼可以發現在如下地方

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

會把@KafkaListener的值賦值給消費者,如果對spring有了解的朋友,可能會知道postProcessAfterInitialization是spring後置處理器的方法,主要用來bean初始化後的一些操作,既然我們知道@KafkaListener會在bean初始化後再進行賦值,那我們就可以在bean初始化前,修改掉@KafkaListener的值。具體實現如下

@Component
public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {

    @SneakyThrows
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

        List<String> packageNames = AutoConfigurationPackages.get(beanFactory);

        for (String packageName : packageNames) {
            Reflections reflections = new Reflections(new ConfigurationBuilder()

www.wyuleezc.cn
www.baihuayllpt.cn
www.huizhonggjzc.cn
www.jintianxuesha.com


                    .addScanners(new SubTypesScanner()) // 新增子類掃描工具
                    .addScanners(new FieldAnnotationsScanner()) // 新增 屬性註解掃描工具
                    .addScanners(new MethodAnnotationsScanner() ) // 新增 方法註解掃描工具
                    .addScanners(new MethodParameterScanner() ) // 新增方法引數掃描工具
            );

            Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
            if(!CollectionUtils.isEmpty(methodSet)){
                for (Method method : methodSet) {
                    KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
                    changeTopics(kafkaListener);
                }
            }
        }

    }


    private void changeTopics(KafkaListener kafkaListener) throws Exception{
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
        Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
        memberValuesField.setAccessible(true);
        Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler);
        String[] topics = (String[])memberValues.get("topics");
        System.out.println("修改前topics:" + Lists.newArrayList(topics));
        for (int i = 0; i < topics.length; i++) {
            topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];
        }
        memberValues.put("topics", topics);
        System.out.println("修改後topics:" + Lists.newArrayList(kafkaListener.topics()));

    }
}

測試

總結

雖然實現了動態修改topic,但我還是覺得topic不要隨便改變,有條件的話,kafka還是得基於物理環境隔離,其次真的客觀條件不允許,要動態變更topic,則需做好topic動態變更宣導以及相關wiki的編寫,不然很容易掉坑