手把手教你Spring Boot2.x整合kafka
阿新 • • 發佈:2021-03-02
#首先得自己搭建一個kafka,搭建教程請自行百度,本人是使用docker搭建了一個單機版的zookeeper+kafka作為演示,文末會有完整程式碼包提供給大家下載參考
![](https://img2020.cnblogs.com/blog/1543487/202103/1543487-20210302195200449-1015025413.png)
#廢話不多說,教程開始
##一、老規矩,先在pom.xml中新增kafka相關依賴
##二、在application.yml中新增相關配置
spring:
#kafka配置
kafka:
#這裡改為你的kafka伺服器ip和埠號
bootstrap-servers: 10.24.19.237:9092
#=============== producer =======================
producer:
#如果該值大於零時,表示啟用重試失敗的傳送次數
retries: 0
#每當多個記錄被髮送到同一分割槽時,生產者將嘗試將記錄一起批量處理為更少的請求,預設值為16384(單位位元組)
batch-size: 16384
#生產者可用於緩衝等待發送到伺服器的記錄的記憶體總位元組數,預設值為3355443
buffer-memory: 33554432
#key的Serializer類,實現類實現了介面org.apache.kafka.common.serialization.Serializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#value的Serializer類,實現類實現了介面org.apache.kafka.common.serialization.Serializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
consumer:
#用於標識此使用者所屬的使用者組的唯一字串
group-id: test-consumer-group
#當Kafka中沒有初始偏移量或者伺服器上不再存在當前偏移量時該怎麼辦,預設值為latest,表示自動將偏移重置為最新的偏移量
#可選的值為latest, earliest, none
auto-offset-reset: earliest
#消費者的偏移量將在後臺定期提交,預設值為true
enable-auto-commit: true
#如果'enable-auto-commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),預設值為5000。
auto-commit-interval: 100
#金鑰的反序列化器類,實現類實現了介面org.apache.kafka.common.serialization.Deserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#值的反序列化器類,實現類實現了介面org.apache.kafka.common.serialization.Deserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
##三、新增操作kafka的工具類KafkaUtils.java(這裡我只是簡單的封裝了一些方法,大家可以根據需要自行新增需要的方法)
package com.example.study.util;
import com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* 操作kafka的工具類
*
* @author [email protected]
* @date 2021/3/2 14:52
*/
@Component
public class KafkaUtils {
@Value("${spring.kafka.bootstrap-servers}")
private String springKafkaBootstrapServers;
private AdminClient adminClient;
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 初始化AdminClient
* '@PostConstruct該註解被用來修飾一個非靜態的void()方法。
* 被@PostConstruct修飾的方法會在伺服器載入Servlet的時候執行,並且只會被伺服器執行一次。
* PostConstruct在建構函式之後執行,init()方法之前執行。
*/
@PostConstruct
private void initAdminClient() {