Kafka之整合SpringBoot
阿新 • • 發佈:2018-11-19
①pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yj</groupId> <artifactId>Kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>Kafka</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> <relativePath /> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.16</version> </dependency> </dependencies> </project>
②KafkaListeners
package com.yj.kafka.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.yj.kafka.entity.Product; import com.yj.kafka.entity.User; @Component public class KafkaListeners { private static final Logger log = LoggerFactory.getLogger(KafkaListeners.class); @KafkaListener(topics = "${kafka.topic.user}") public void UserListener(String payload) { log.info("UserListener監聽到資料:"+payload); User user = JSONObject.parseObject(payload, User.class); log.info("user:" + user); } @KafkaListener(topics = "${kafka.topic.product}") public void productListener(String payload) { log.info("productListener監聽到資料:"+payload); Product product = JSONObject.parseObject(payload, Product.class); log.info("product:" + product); } }
③TestController
package com.yj.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.alibaba.fastjson.JSON; import com.yj.kafka.entity.Product; import com.yj.kafka.entity.User; @RestController @RequestMapping("kafka") public class TestController { @Value("${kafka.topic.user}") private String userTopic; @Value("${kafka.topic.product}") private String productTopic; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("sendUser") public String sendUser(@RequestBody User user) { String userStr = JSON.toJSONString(user); kafkaTemplate.send(userTopic, userStr); return "sendUserSuccess"; } @RequestMapping("sendProduct") public String sendProduct(@RequestBody Product product) { String productStr = JSON.toJSONString(product); kafkaTemplate.send(productTopic, productStr); return "sendProductSuccess"; } }
④User,Product
package com.yj.kafka.entity;
public class User {
private String name;
private String pwd;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPwd() {
return pwd;
}
public void setPwd(String pwd) {
this.pwd = pwd;
}
@Override
public String toString() {
return "User [name=" + name + ", pwd=" + pwd + "]";
}
}
package com.yj.kafka.entity;
public class Product {
private String name;
private int price;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
@Override
public String toString() {
return "Product [name=" + name + ", price=" + price + "]";
}
}
⑤Application
package com.yj.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
⑥application.properties
spring.kafka.bootstrap-servers=192.168.37.138:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=test
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic.user=user
kafka.topic.product=product
⑦驗證
啟動專案後,訪問http://localhost:8080/kafka/sendUser,顯示
2018-11-14 17:00:05.959 INFO 7496 --- [afka-consumer-1] com.yj.kafka.consumer.KafkaListeners : payload:{"name":"yj","pwd":"123456"}
2018-11-14 17:00:05.959 INFO 7496 --- [afka-consumer-1] com.yj.kafka.consumer.KafkaListeners : user:User [name=yj, pwd=123456]
kafka自帶的kafka-console-consumer客戶端也能監聽到資料,當我們執行
sh kafka-console-consumer.sh --bootstrap-server 192.168.37.138:9092 --topic user--from-beginning
也會監聽到資料
{"name":"yj","pwd":"123456"}