1. 程式人生 > >Kafka之整合SpringBoot

Kafka之整合SpringBoot

①pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.yj</groupId>
	<artifactId>Kafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>Kafka</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.2.RELEASE</version>
		<relativePath />
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.16</version>
		</dependency>
	</dependencies>
</project>

②KafkaListeners

package com.yj.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.yj.kafka.entity.Product;
import com.yj.kafka.entity.User;

@Component
public class KafkaListeners {
	private static final Logger log = LoggerFactory.getLogger(KafkaListeners.class);

	@KafkaListener(topics = "${kafka.topic.user}")
	public void UserListener(String payload)  {
		log.info("UserListener監聽到資料:"+payload);
		User user = JSONObject.parseObject(payload, User.class);
		log.info("user:" + user);
	}
	
	@KafkaListener(topics = "${kafka.topic.product}")
	public void productListener(String payload)  {
		log.info("productListener監聽到資料:"+payload);
		Product product = JSONObject.parseObject(payload, Product.class);
		log.info("product:" + product);
	}
}

③TestController

package com.yj.kafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.yj.kafka.entity.Product;
import com.yj.kafka.entity.User;

@RestController
@RequestMapping("kafka")
public class TestController {
	
	@Value("${kafka.topic.user}")
        private String userTopic;
	
	@Value("${kafka.topic.product}")
        private String productTopic;

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	@RequestMapping("sendUser")
	public String sendUser(@RequestBody User user) {
		String userStr = JSON.toJSONString(user);
		kafkaTemplate.send(userTopic, userStr);
		return "sendUserSuccess";
	}
	
	@RequestMapping("sendProduct")
	public String sendProduct(@RequestBody Product product) {
		String productStr = JSON.toJSONString(product);
		kafkaTemplate.send(productTopic, productStr);
		return "sendProductSuccess";
	}
}

④User,Product

package com.yj.kafka.entity;

public class User {
	private String name;
	private String pwd;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getPwd() {
		return pwd;
	}
	public void setPwd(String pwd) {
		this.pwd = pwd;
	}
	@Override
	public String toString() {
		return "User [name=" + name + ", pwd=" + pwd + "]";
	}
}
package com.yj.kafka.entity;

public class Product {
	private String name;
	private int price;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getPrice() {
		return price;
	}
	public void setPrice(int price) {
		this.price = price;
	}
	@Override
	public String toString() {
		return "Product [name=" + name + ", price=" + price + "]";
	}
}

⑤Application

package com.yj.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
}

⑥application.properties

spring.kafka.bootstrap-servers=192.168.37.138:9092 
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 
spring.kafka.consumer.group-id=test 
spring.kafka.consumer.enable-auto-commit=true 
spring.kafka.consumer.auto-commit-interval=1000 
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer 
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka.topic.user=user
kafka.topic.product=product

⑦驗證

啟動專案後,訪問http://localhost:8080/kafka/sendUser,顯示

2018-11-14 17:00:05.959  INFO 7496 --- [afka-consumer-1] com.yj.kafka.consumer.KafkaListeners     : payload:{"name":"yj","pwd":"123456"}
2018-11-14 17:00:05.959  INFO 7496 --- [afka-consumer-1] com.yj.kafka.consumer.KafkaListeners     : user:User [name=yj, pwd=123456]

 kafka自帶的kafka-console-consumer客戶端也能監聽到資料,當我們執行

sh kafka-console-consumer.sh --bootstrap-server 192.168.37.138:9092 --topic user--from-beginning

也會監聽到資料

{"name":"yj","pwd":"123456"}