1. 程式人生 > 其它 >spring boot kafka_SpringBoot整合Kafka

spring boot kafka_SpringBoot整合Kafka

技術標籤:spring boot kafkaspringboot整合kafka

SpringBoot整合Kafka

本篇主要講解SpringBoot 如何整合Kafka ,並且簡單的 編寫了一個Demo 來測試 傳送和消費功能

前言

選擇的版本如下:

springboot :2.3.4.RELEASE

spring-kafka :2.5.6.RELEASE

kafka : 2.5.1

zookeeper : 3.4.14

本Demo 使用的是 SpringBoot 比較高的版本 SpringBoot 2.3.4.RELEASE 它會引入 spring-kafka 2.5.6 RELEASE ,對應了版本關係中的

Spring Boot 2.3 users should use 2.5.x (Boot dependency management will use the correct version).

spring和 kafka 的版本 關係

https://spring.io/projects/spring-kafka

1.搭建Kafka 和 Zookeeper 環境

搭建kafka 和 zookeeper 環境 並且啟動 它們

2.建立Demo 專案引入spring-kafka

2.1 pom 檔案

<dependency>  <groupId>org.springframework.bootgroupId>  <artifactId>spring-boot-starter-webartifactId>dependency><dependency>  <groupId>org.springframework.kafkagroupId>  <artifactId>spring-kafkaartifactId>dependency><dependency>  <groupId>com.google.code.gsongroupId>  <artifactId>gsonartifactId>dependency>

2.2 配置application.yml

spring:  kafka:   bootstrap-servers: 192.168.25.6:9092 #bootstrap-servers:連線kafka的地址,多個地址用逗號分隔   consumer:    group-id: myGroup    enable-auto-commit: true    auto-commit-interval: 100ms    properties:     session.timeout.ms: 15000    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer    auto-offset-reset: earliest   producer:    retries: 0 #若設定大於0的值,客戶端會將傳送失敗的記錄重新發送    batch-size: 16384 #當將多個記錄被髮送到同一個分割槽時, Producer 將嘗試將記錄組合到更少的請求中。這有助於提升客戶端和伺服器端的效能。這個配置控制一個批次的預設大小(以位元組為單位)。16384是預設的配置    buffer-memory: 33554432 #Producer 用來緩衝等待被髮送到伺服器的記錄的總位元組數,33554432是預設配置    key-serializer: org.apache.kafka.common.serialization.StringSerializer #關鍵字的序列化類    value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化類

2.3 定義訊息體Message

/** * @author johnny * @create 2020-09-23 上午9:21 **/@Datapublic class Message {  private Long id;  private String msg;  private Date sendTime;}

2.4 定義KafkaSender

主要利用 KafkaTemplate 來發送訊息 ,將訊息封裝成Message 並且進行 轉化成Json串 傳送到Kafka中

@[email protected] class KafkaSender {  private final KafkaTemplate<String, String> kafkaTemplate;  //構造器方式注入  kafkaTemplate  public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {    this.kafkaTemplate = kafkaTemplate;   }  private Gson gson = new GsonBuilder().create();  public void send(String msg) {    Message message = new Message();    message.setId(System.currentTimeMillis());    message.setMsg(msg);    message.setSendTime(new Date());    log.info("【++++++++++++++++++ message :{}】", gson.toJson(message));    //對 topic =  hello2 的傳送訊息    kafkaTemplate.send("hello2",gson.toJson(message));   }}

2.5 定義KafkaConsumer

在監聽的方法上通過註解配置一個監聽器即可,另外就是指定需要監聽的topic

kafka的訊息在接收端會被封裝成ConsumerRecord物件返回,它內部的value屬性就是實際的訊息。

@[email protected] class KafkaConsumer {  @KafkaListener(topics = {"hello2"})  public void listen(ConsumerRecord, ?> record) {    Optional.ofNullable(record.value())         .ifPresent(message -> {          log.info("【+++++++++++++++++ record = {} 】", record);          log.info("【+++++++++++++++++ message = {}】", message);         });   }}

3.測試 效果

提供一個 Http介面呼叫 KafkaSender 去傳送訊息

3.1 提供Http 測試介面

@[email protected] class TestController {  @Autowired  private KafkaSender kafkaSender;  @GetMapping("sendMessage/{msg}")  public void sendMessage(@PathVariable("msg") String msg){    kafkaSender.send(msg);   }}

3.2 啟動專案

監聽8080 埠

KafkaMessageListenerContainer 中有 consumer group = myGroup 有一個 監聽 hello2-0 topic 的 消費者

d6f3e23207d1f7f2eac7b1c94ccc4152.png

3.3 呼叫Http介面

http://localhost:8080/sendMessage/KafkaTestMsg

167676e9b244d0861f35d6b1269ae8f9.png

至此 SpringBoot整合Kafka 結束 。。