1. 程式人生 > 其它 >kafka07-spring整合kafka

kafka07-spring整合kafka

版本資訊kafka 1.0.2

spring-kafka高版本相容低版本

pom

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

KafkaProducerController

package com.lew.sp.controller;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

/**
 * @Author llewcg
 * @Description 生產者生產訊息
 */
@RestController
public class KafkaProducerController {

    @Autowired
    KafkaTemplate<Integer, String> kafkaTemplate;

    @RequestMapping("/asyncSendMess/{msg}")
    public String asyncSendMess(@PathVariable String msg) {
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(new ProducerRecord<Integer, String>("gc_spring_1", 1, msg));
        try {
            SendResult<Integer, String> sendResult = future.get();
            RecordMetadata recordMetadata = sendResult.getRecordMetadata();
            System.out.println(recordMetadata.topic() + "\t" + recordMetadata.offset());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return "success";
    }


    @RequestMapping("/noAsyncSendMess/{msg}")
    public String noAsyncSendMess(@PathVariable String msg) {
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(new ProducerRecord<Integer, String>("gc_spring_1", 1, "gc_well_spring"));
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("傳送失敗");
            }
            @Override
            public void onSuccess(SendResult<Integer, String> sendResult) {
                RecordMetadata recordMetadata = sendResult.getRecordMetadata();
                System.out.println(recordMetadata.topic() + "\t" + recordMetadata.offset());
            }
        });
        return "success";
    }
}

CusConsumer

package com.lew.sp.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * @Author llewcg
 * @Description
 */
@Component
public class CusConsumer {
    @KafkaListener(topics = "gc-spring-02")
    public void consumerMess(ConsumerRecord<Integer, String> consumerRecord){
        Optional<ConsumerRecord<Integer, String>> consumerRecordOptional = Optional.ofNullable(consumerRecord);
        if(consumerRecordOptional.isPresent()){
            System.out.println( consumerRecord.topic() + "\t" + consumerRecord.partition() + "\t" + consumerRecord.offset() + "\t" + consumerRecord.key() + "\t" + consumerRecord.value());
        }
    }
}

KafkaConfig

修改自動注入的配置

package com.lew.sp.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author lewcg
 * @Description 修改預設配置
 */
@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic topic1() {
        return new NewTopic("ntp-1", 2, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("ntp-02", 3, (short) 1);
    }
/*    @Bean
    public KafkaAdmin newAdmin(){
        Map<String, Object> config = new HashMap<>();
        config.put("xxx","xxx");
        return  new KafkaAdmin(config);
    }*/

/*    @Bean
    public KafkaTemplate<Integer, String> newTemplate(ProducerFactory<Integer, String> producerFactory){
        Map<String, Object> config = new HashMap<>();
        //覆蓋原有設定
        config.put("xxx","xxx");
        return new KafkaTemplate<Integer, String>(producerFactory, config);
    }*/
}

演示效果

作者:gcwell
版權:本文版權歸作者和部落格園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段宣告;必須在文章中給出原文連線;否則必究法律責任