1. 程式人生 > >Kafka 入門和 Spring Boot 集成

Kafka 入門和 Spring Boot 集成

serial 發布消息 package 文件中 res 技術分享 搜索 info part

Kafka 入門和 Spring Boot 集成

標簽:博客

[TOC]

概述

kafka 是一個高性能的消息隊列,也是一個分布式流處理平臺(這裏的流指的是數據流)。由java 和 Scala 語言編寫,最早由 LinkedIn 開發,並 2011年開源,現在由 Apache 開發維護。

應用場景

下面列舉了一些kafka常見的應用場景。

消息隊列 : Kafka 可以作為消息隊列使用,可用於系統內異步解耦,流量削峰等場景。

應用監控:利用 Kafka 采集應用程序和服務器健康相關的指標,如應用程序相關的日誌,服務器相關的 CPU、占用率、 IO、內存、連接數、 TPS、 QPS等,然後將指標信息進行處理,從而構建一個具有監控儀表盤、曲線圖等可視化監控系統。 例如, 很多公司采用 Kafka 與 ELK(ElasticSearch、 Logstash 和Kibana)整合構建應用服務的監控系統。

流處理:比如將 kafka 接收到的數據發送給 Storm 流式計算框架處理。

基本概念

record(消息):kafka 通信的基本單位,每一條消息稱為record

producer (生產者 ):發送消息的客戶端。

consumer(消費者 ):消費消息的客戶端。

consumerGroup (消費者組):每一個消費者都屬於一個特定的消費者組。

消費者和消費者組的關系

  • 如果a,b,c 屬於同一個消費者組,那一條消息只能被 a,b,c 中的某一個消費者消費。
  • 如果a,b,c 屬於不同的消費者組(比如 ga,gb,gc) ,那一條消息過來,a,b,c 三個消費者都能消費到。

topic (主題)

: kafka的消息通過topic來分類,類似於數據庫的表。 producer 發布消息到 topic,consumer訂閱 topic 進行消費

partition( 分區):一個topic會被分成一到多個分區(partition),然後多個分區可以分布在不同的機器上,這樣一個主題就相當於運行在了多臺機子上,kafka用分區的方式提高了性能和吞吐量

replica (副本):一個分區有一到多個副本,副本的作用是提高分區的 可用性。

offset(偏移量):偏移量 類似數據庫自增int Id,隨著數據的不斷寫入 kafka 分區內的偏移量會不斷增加,一條消息由一個唯一的偏移量來標識。偏移量的作用是,讓消費者知道自己消費到了哪個位置,下次可以接著從這裏消費。如下圖:

技術分享圖片
消費者A 消費到了 offset 為 9 的記錄,消費者 B 消費到了offset 為 11 的記錄。

基本結構

kafka 最基本的結構如下,跟常見的消息隊列結構一樣。
技術分享圖片
消息通過生產者發送到 kafka 集群, 然後消費者從 kafka 集群拉取消息進行消費。

和Spring Boot 集成

集成概述

本集成方式采用的是 spring boot 官方文檔說的集成方式,官方鏈接,集成的大體思路是,通過在 spring boot application.properties 中配置 生產者和消費者的基本信息,然後spring boot 啟動後會創建 KafkaTemplate 對象,這個對象可以用來發送消息到Kafka,然後用 @KafkaListener 註解來消費 kafka 裏面的消息,具體步驟如下。

集成環境

spring boot:1.5.13 版本
spring-kafka:1.3.5 版本
kafka:1.0.1 版本

kafka 環境搭建

先啟動Zookeeper:

docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime zookeeper:latest 

再啟動Kafka:替換下面的IP為你服務器IP即可

docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.10.253 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest 

Spring Boot 和 Spring for Apache Kafka 集成步驟

  1. 首先pom中引入 Spring for Apache Kafka
<!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>
  1. 然後 application.properties 配置文件中加入如下配置:
    各個配置的解釋見:spring boot 附錄中的 kafka 配置,搜索kafka 關鍵字即可定位。
server.port=8090

####### kafka

### producer 配置
spring.kafka.producer.bootstrap-servers=192.168.10.48:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

### consumer 配置
spring.kafka.consumer.bootstrap-servers=192.168.10.48:9092
spring.kafka.consumer.group-id=anuoapp
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5
  1. 創建 Kafka Producer 生產者
package com.example.anuoapp.kafka;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Component
public class KafkaProducer {
    @Autowired
    KafkaTemplate kafkaTemplate;

    public void kafkaSend() throws Exception {
        UserAccount userAccount=new UserAccount();
        userAccount.setCard_name("jk");
        userAccount.setAddress("cd");
        ListenableFuture send = kafkaTemplate.send("jktopic", "key", JSON.toJSONString(userAccount));



    }
}
  1. 創建 Kafka Consumer 消費者
package com.example.anuoapp.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    public static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);


    @KafkaListener(topics = {"jktopic"})
    public void jktopic(ConsumerRecord consumerRecord) throws InterruptedException {

        System.out.println(consumerRecord.offset());
        System.out.println(consumerRecord.value().toString());
        Thread.sleep(3000);


    }


}
  1. 創建一個rest api 來調用 Kafka 的消息生產者
package com.example.anuoapp.controller;

import com.example.anuoapp.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/api/system")
public class SystemController {
    private Logger logger = LoggerFactory.getLogger(SystemController.class);

    @Autowired
    KafkaProducer kafkaProducer;

    @RequestMapping(value = "/Kafka/send", method = RequestMethod.GET)
    public void WarnInfo() throws Exception {
        int count=10;
        for (int i = 0; i < count; i++) {
            kafkaProducer.kafkaSend();
        }

    }



}
  1. 用 post man 調用 第 5 步創建的接口, 就可以看到 如下消費者產生的輸出信息
30
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
31
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
32
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}

最後

恭喜你 ! spring boot kafka 集成完畢。
完整的基礎源碼見:
鏈接: https://pan.baidu.com/s/1E2Lmbj9A9uruTXG54uPl_g 密碼: e6d6

Kafka 入門和 Spring Boot 集成