springboot 整合 kafka 示例 教程
阿新 • • 發佈:2018-11-04
1、使用IDEA新建工程,建立工程 springboot-kafka-producer。
工程POM檔案程式碼如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.miniooc</groupId> <artifactId>springboot-kafka-producer</artifactId> <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <name>springboot-kafka-producer</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.3.RELEASE</version> <relativePath/> </parent> <properties> <spring-cloud.version>Finchley.RELEASE</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 新增 gson 依賴 --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <!-- 新增 lombok 依賴 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> <scope>provided</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
註釋部分為手動新增的 gson、lombok 依賴。
2、建立訊息實體類
package com.miniooc.kafka.message; import lombok.Data; import java.io.Serializable; import java.util.Date; import java.util.List; @Data public class OrderBasic implements Serializable { /** * 訂單ID */ private String orderId; /** * 訂單編號 */ private String orderNumber; /** * 訂單日期 */ private Date date; /** * 訂單資訊 */ private List<String> desc; }
3、建立訊息生產類
/** * */ package com.miniooc.kafka.producer; import com.google.gson.GsonBuilder; import com.miniooc.kafka.message.OrderBasic; import lombok.extern.java.Log; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * Kafka訊息生產類 */ @Log @Component public class KafkaProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; @Value("${kafka.topic.order}") private String topicOrder; /** * 傳送訂單訊息 * * @param orderBasic 訂單資訊 */ public void sendOrderMessage(OrderBasic orderBasic) { GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); builder.setDateFormat("yyyy-MM-dd HH:mm:ss"); String message = builder.create().toJson(orderBasic); kafkaTemplate.send(topicOrder, message); log.info("\n生產訊息至Kafka\n" + message); } }
4、編輯資源配置檔案 application.properties
server.port=9527
spring.application.name=kafka-consumer
kafka.bootstrap.servers=localhost:9092
kafka.topic.order=topic-order
kafka.group.id=group-order
5、啟動 zookeeper
[[email protected] kafka]# bin/zookeeper-server-start.sh config/zookeeper.properties
6、啟動 kafka
[[email protected] kafka]# bin/kafka-server-start.sh config/server.properties
7、執行工程,通過控制器呼叫訊息生產類,建立一條訊息到kafka
看到紅框內容,說明訊息傳送成功。
8、再使用IDEA新建工程引導方式,建立訊息消費工程 springboot-kafka-producer。
9、建立訊息消費類,並監聽topic。
package com.miniooc.kafka.consumer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import com.miniooc.kafka.message.OrderBasic;
import lombok.extern.java.Log;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Log
@Component
public class KafkaConsumer {
@KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
public void consume(@Payload String message) {
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
Gson gson = builder.create();
OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() {
}.getType());
String json = gson.toJson(orderBasic);
log.info("\n接受並消費訊息\n" + json);
}
}
10、執行工程。
看到紅框內容,說明訊息消費成功。
SpringBoot Kafka 整合完成!
有需要原始碼的,私信我
微訊號: songlu2011
QQ號: 13637818