springboot + kafka 入門例項 入門demo
阿新 • • 發佈:2020-08-20
springboot + kafka 入門例項 入門demo
版本說明
- springboot版本:2.3.3.RELEASE
- kakfa服務端版本:kafka_2.12-2.6.0.tgz
- zookeeper服務端版本:apache-zookeeper-3.6.1-bin.tar.gz
例項搭建前提條件
1,搭建好zookeeper服務,本例項zookeeper使用單機偽叢集模式,
192.168.1.126:2181, 192.168.1.126:2182, 192.168.1.126:2183
2,搭建好kafka服務,本例項kafka使用單機偽叢集模式,
192.168.1.126:9092, 192.168.1.126:9093, 192.168.1.126:9094
1. 匯入相關依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>springboot-kafka-demo</artifactId> <version>1.0-SNAPSHOT</version> <name>springboot-kafka-demo</name> <description>springboot-kafka-demo</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2. yml配置
server: port: 8080 servlet: context-path: / tomcat: uri-encoding: UTF-8 spring: kafka: #本地虛擬機器kafka偽叢集 bootstrap-servers: 192.168.1.126:9092,192.168.1.126:9093,192.168.1.126:9094 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer batch-size: 65536 buffer-memory: 524288 #自定義的topic myTopic1: testTopic1 myTopic2: testTopic2 consumer: group-id: default-group #預設組id 後面會配置多個消費者組 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: latest enable-auto-commit: false #關閉自動提交 改由spring-kafka提交 auto-commit-interval: 100 max-poll-records: 20 #批量消費 一次接收的最大數量
3. 部分程式碼
訊息實體類
package com.example.demo.entity;
import java.util.Date;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Message {
private Long id;
private String msg;
private Date sendTime;
}
kafka配置類
package com.example.demo.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* kafka配置類
*/
@Data
@Configuration
public class KafkaConfiguration {
/**
* kafaka叢集列表
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* kafaka消費group列表
*/
@Value("${spring.kafka.consumer.group-id}")
private String defaultGroupId;
/**
* 消費開始位置
*/
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
/**
* 是否自動提交
*/
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
/**
* #如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),預設值為5000。
*/
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
/**
* 一次呼叫poll()操作時返回的最大記錄數,預設值為500
*/
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
/**
* 自定義的topic1
*/
@Value("${spring.kafka.producer.myTopic1}")
private String myTopic1;
/**
* 自定義的topic2
*/
@Value("${spring.kafka.producer.myTopic2}")
private String myTopic2;
}
消費者監聽類
package com.example.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* 消費者1(監聽topic1佇列)
*/
@Component
public class ConsumerListener1 {
@KafkaListener(topics = "${spring.kafka.producer.myTopic1}")
public void listen(ConsumerRecord<?,String> record) {
System.out.println(record);
String value = record.value();
System.out.println("消費者1接收到訊息:" + value);
}
}
測試類
package com.example.demo.controller;
import com.alibaba.fastjson.JSON;
import com.example.demo.config.KafkaConfiguration;
import com.example.demo.entity.Message;
import com.example.demo.service.KafkaService;
import com.example.demo.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaService kafkaService;
@Autowired
private KafkaConfiguration kafkaConfiguration;
/**
* 傳送文字訊息
* @param msg
* @return
*/
@GetMapping("/send/{msg}")
public String send(@PathVariable String msg) {
kafkaService.send(kafkaConfiguration.getMyTopic1(), msg);
return "生產者傳送訊息給topic1:"+msg;
}
/**
* 傳送JSON資料
* @return
*/
@GetMapping("/send2")
public String send2() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg("生產者傳送訊息到topic1: " + UUID.getUUID32());
message.setSendTime(new Date());
String value = JSON.toJSONString(message);
log.info("生產者傳送訊息到topic1 message = {}", value);
kafkaService.send(kafkaConfiguration.getMyTopic1(),value);
return value;
}
/**
* 傳送JSON資料
* @return
*/
@GetMapping("/send3")
public String send3() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg("生產者傳送訊息到topic2: " + UUID.getUUID32());
message.setSendTime(new Date());
String value = JSON.toJSONString(message);
log.info("生產者傳送訊息到topic2 message = {}", value);
kafkaService.send(kafkaConfiguration.getMyTopic2(),value);
return value;
}
}
4. 例項執行結果
5. 寫在最後
本例項原始碼:https://gitee.com/jelly_oy/springboot-kafka-demo
本例項採用springboot2.3.3 + zookeeper3.6.1 + kafka2.6.0 進行搭建
如果本專案對你有幫助,歡迎留言評論,歡迎git clone原始碼。