kafka07-spring整合kafka
阿新 • • 發佈:2021-08-15
版本資訊kafka 1.0.2
spring-kafka高版本相容低版本
pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
KafkaProducerController
package com.lew.sp.controller; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; /** * @Author llewcg * @Description 生產者生產訊息 */ @RestController public class KafkaProducerController { @Autowired KafkaTemplate<Integer, String> kafkaTemplate; @RequestMapping("/asyncSendMess/{msg}") public String asyncSendMess(@PathVariable String msg) { ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(new ProducerRecord<Integer, String>("gc_spring_1", 1, msg)); try { SendResult<Integer, String> sendResult = future.get(); RecordMetadata recordMetadata = sendResult.getRecordMetadata(); System.out.println(recordMetadata.topic() + "\t" + recordMetadata.offset()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return "success"; } @RequestMapping("/noAsyncSendMess/{msg}") public String noAsyncSendMess(@PathVariable String msg) { ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(new ProducerRecord<Integer, String>("gc_spring_1", 1, "gc_well_spring")); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable throwable) { System.out.println("傳送失敗"); } @Override public void onSuccess(SendResult<Integer, String> sendResult) { RecordMetadata recordMetadata = sendResult.getRecordMetadata(); System.out.println(recordMetadata.topic() + "\t" + recordMetadata.offset()); } }); return "success"; } }
CusConsumer
package com.lew.sp.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * @Author llewcg * @Description */ @Component public class CusConsumer { @KafkaListener(topics = "gc-spring-02") public void consumerMess(ConsumerRecord<Integer, String> consumerRecord){ Optional<ConsumerRecord<Integer, String>> consumerRecordOptional = Optional.ofNullable(consumerRecord); if(consumerRecordOptional.isPresent()){ System.out.println( consumerRecord.topic() + "\t" + consumerRecord.partition() + "\t" + consumerRecord.offset() + "\t" + consumerRecord.key() + "\t" + consumerRecord.value()); } } }
KafkaConfig
修改自動注入的配置
package com.lew.sp.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author lewcg
* @Description 修改預設配置
*/
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic1() {
return new NewTopic("ntp-1", 2, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("ntp-02", 3, (short) 1);
}
/* @Bean
public KafkaAdmin newAdmin(){
Map<String, Object> config = new HashMap<>();
config.put("xxx","xxx");
return new KafkaAdmin(config);
}*/
/* @Bean
public KafkaTemplate<Integer, String> newTemplate(ProducerFactory<Integer, String> producerFactory){
Map<String, Object> config = new HashMap<>();
//覆蓋原有設定
config.put("xxx","xxx");
return new KafkaTemplate<Integer, String>(producerFactory, config);
}*/
}
演示效果
版權:本文版權歸作者和部落格園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段宣告;必須在文章中給出原文連線;否則必究法律責任