Kafka 入門和 Spring Boot 集成
Kafka 入門和 Spring Boot 集成
標簽:博客
[TOC]
概述
kafka 是一個高性能的消息隊列,也是一個分布式流處理平臺(這裏的流指的是數據流)。由java 和 Scala 語言編寫,最早由 LinkedIn 開發,並 2011年開源,現在由 Apache 開發維護。
應用場景
下面列舉了一些kafka常見的應用場景。
消息隊列 : Kafka 可以作為消息隊列使用,可用於系統內異步解耦,流量削峰等場景。
應用監控:利用 Kafka 采集應用程序和服務器健康相關的指標,如應用程序相關的日誌,服務器相關的 CPU、占用率、 IO、內存、連接數、 TPS、 QPS等,然後將指標信息進行處理,從而構建一個具有監控儀表盤、曲線圖等可視化監控系統。 例如, 很多公司采用 Kafka 與 ELK(ElasticSearch、 Logstash 和Kibana)整合構建應用服務的監控系統。
流處理:比如將 kafka 接收到的數據發送給 Storm 流式計算框架處理。
基本概念
record(消息):kafka 通信的基本單位,每一條消息稱為record
producer (生產者 ):發送消息的客戶端。
consumer(消費者 ):消費消息的客戶端。
consumerGroup (消費者組):每一個消費者都屬於一個特定的消費者組。
消費者和消費者組的關系:
- 如果a,b,c 屬於同一個消費者組,那一條消息只能被 a,b,c 中的某一個消費者消費。
- 如果a,b,c 屬於不同的消費者組(比如 ga,gb,gc) ,那一條消息過來,a,b,c 三個消費者都能消費到。
topic (主題)
partition( 分區):一個topic會被分成一到多個分區(partition),然後多個分區可以分布在不同的機器上,這樣一個主題就相當於運行在了多臺機子上,kafka用分區的方式提高了性能和吞吐量
replica (副本):一個分區有一到多個副本,副本的作用是提高分區的 可用性。
offset(偏移量):偏移量 類似數據庫自增int Id,隨著數據的不斷寫入 kafka 分區內的偏移量會不斷增加,一條消息由一個唯一的偏移量來標識。偏移量的作用是,讓消費者知道自己消費到了哪個位置,下次可以接著從這裏消費。如下圖:
消費者A 消費到了 offset 為 9 的記錄,消費者 B 消費到了offset 為 11 的記錄。
基本結構
kafka 最基本的結構如下,跟常見的消息隊列結構一樣。
消息通過生產者發送到 kafka 集群, 然後消費者從 kafka 集群拉取消息進行消費。
和Spring Boot 集成
集成概述
本集成方式采用的是 spring boot 官方文檔說的集成方式,官方鏈接,集成的大體思路是,通過在 spring boot application.properties 中配置 生產者和消費者的基本信息,然後spring boot 啟動後會創建 KafkaTemplate 對象,這個對象可以用來發送消息到Kafka,然後用 @KafkaListener 註解來消費 kafka 裏面的消息,具體步驟如下。
集成環境
spring boot
:1.5.13 版本
spring-kafka
:1.3.5 版本
kafka
:1.0.1 版本
kafka 環境搭建
先啟動Zookeeper:
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime zookeeper:latest
再啟動Kafka:替換下面的IP為你服務器IP即可
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.10.253 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
Spring Boot 和 Spring for Apache Kafka 集成步驟
- 首先pom中引入 Spring for Apache Kafka
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
- 然後 application.properties 配置文件中加入如下配置:
各個配置的解釋見:spring boot 附錄中的 kafka 配置,搜索kafka 關鍵字即可定位。
server.port=8090
####### kafka
### producer 配置
spring.kafka.producer.bootstrap-servers=192.168.10.48:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
### consumer 配置
spring.kafka.consumer.bootstrap-servers=192.168.10.48:9092
spring.kafka.consumer.group-id=anuoapp
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5
- 創建 Kafka Producer 生產者
package com.example.anuoapp.kafka;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@Component
public class KafkaProducer {
@Autowired
KafkaTemplate kafkaTemplate;
public void kafkaSend() throws Exception {
UserAccount userAccount=new UserAccount();
userAccount.setCard_name("jk");
userAccount.setAddress("cd");
ListenableFuture send = kafkaTemplate.send("jktopic", "key", JSON.toJSONString(userAccount));
}
}
- 創建 Kafka Consumer 消費者
package com.example.anuoapp.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
public static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = {"jktopic"})
public void jktopic(ConsumerRecord consumerRecord) throws InterruptedException {
System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.value().toString());
Thread.sleep(3000);
}
}
- 創建一個rest api 來調用 Kafka 的消息生產者
package com.example.anuoapp.controller;
import com.example.anuoapp.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/system")
public class SystemController {
private Logger logger = LoggerFactory.getLogger(SystemController.class);
@Autowired
KafkaProducer kafkaProducer;
@RequestMapping(value = "/Kafka/send", method = RequestMethod.GET)
public void WarnInfo() throws Exception {
int count=10;
for (int i = 0; i < count; i++) {
kafkaProducer.kafkaSend();
}
}
}
- 用 post man 調用 第 5 步創建的接口, 就可以看到 如下消費者產生的輸出信息
30
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
31
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
32
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
最後
恭喜你 ! spring boot kafka 集成完畢。
完整的基礎源碼見:
鏈接: https://pan.baidu.com/s/1E2Lmbj9A9uruTXG54uPl_g 密碼: e6d6
Kafka 入門和 Spring Boot 集成