如何在Spring Boot v2.x 中 操作kafka (kafka v1.1.0)
阿新 • • 發佈:2019-02-04
概述
本文采用的是 spring boot 官方文件說的整合方式,具體見 Apache Kafka Support.
思路是通過在 spring boot application.properties 中配置 生產者和消費者的基本資訊,然後spring boot 啟動後會建立 KafkaTemplate 物件,這個物件可以用來發送訊息到Kafka,然後用 @KafkaListener 註解來消費 kafka 裡面的訊息。
版本資訊:
spring boot: 2.0.3.RELEASE
spring-kafka: 2.1.7.RELEAS
kafka: 1.1.0
提示:得先安裝和配置zookper, kafka,並且讓它們能正確執行起來。
Spring Boot 和 Spring for Apache Kafka 整合步驟
- 在pom.xml中引入 Spring Kafka依賴
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
然後 application.properties 配置檔案中加入如下配置:
server.port=8090 ####### kafka ### producer 配置 spring.kafka.producer.bootstrap-servers=192.168.10.48:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer ### consumer 配置 spring.kafka.consumer.bootstrap-servers=192.168.10.48:9092 spring.kafka.consumer.group-id=anuoapp spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.max-poll-records=1 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.listener.concurrency=5
建立 Kafka Producer 生產者
package com.example.anuoapp.kafka;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@Component
public class KafkaProducer {
@Autowired
KafkaTemplate kafkaTemplate;
public void kafkaSend() throws Exception {
UserAccount userAccount=new UserAccount();
userAccount.setCard_name("jk");
userAccount.setAddress("cd");
ListenableFuture send = kafkaTemplate.send("mytopic", "key", JSON.toJSONString(userAccount));
}
}
建立 Kafka Consumer 消費者
package com.example.anuoapp.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
public static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = {"mytopic"})
public void jktopic(ConsumerRecord consumerRecord) throws InterruptedException {
System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.value().toString());
Thread.sleep(3000);
}
}
建立一個rest api 來呼叫 Kafka 的訊息生產者
package com.example.anuoapp.controller;
import com.example.anuoapp.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/kafkaAPI")
public class SystemController {
private Logger logger = LoggerFactory.getLogger(SystemController.class);
@Autowired
KafkaProducer kafkaProducer;
@RequestMapping(value = "/produce", method = RequestMethod.GET)
public void WarnInfo() throws Exception {
int count=10;
for (int i = 0; i < count; i++) {
kafkaProducer.kafkaSend();
}
}
}
輸出如下:
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}