1. 程式人生 > >如何在Spring Boot v2.x 中 操作kafka (kafka v1.1.0)

如何在Spring Boot v2.x 中 操作kafka (kafka v1.1.0)

概述

本文采用的是 spring boot 官方文件說的整合方式,具體見 Apache Kafka Support.

思路是通過在 spring boot application.properties 中配置 生產者和消費者的基本資訊,然後spring boot 啟動後會建立 KafkaTemplate 物件,這個物件可以用來發送訊息到Kafka,然後用 @KafkaListener 註解來消費 kafka 裡面的訊息。

版本資訊:

spring boot: 2.0.3.RELEASE
spring-kafka: 2.1.7.RELEAS
kafka: 1.1.0 

提示:得先安裝和配置zookper, kafka,並且讓它們能正確執行起來。

Spring Boot 和 Spring for Apache Kafka 整合步驟

  1. 在pom.xml中引入 Spring  Kafka依賴
<!-- kafka -->
  <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.1.7.RELEASE</version>
 </dependency>

然後 application.properties 配置檔案中加入如下配置:

server.port=8090

####### kafka

### producer 配置
spring.kafka.producer.bootstrap-servers=192.168.10.48:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

### consumer 配置
spring.kafka.consumer.bootstrap-servers=192.168.10.48:9092
spring.kafka.consumer.group-id=anuoapp
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5

建立 Kafka Producer 生產者

package com.example.anuoapp.kafka;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Component
public class KafkaProducer {
    @Autowired
    KafkaTemplate kafkaTemplate;

    public void kafkaSend() throws Exception {
        UserAccount userAccount=new UserAccount();
        userAccount.setCard_name("jk");
        userAccount.setAddress("cd");
        ListenableFuture send = kafkaTemplate.send("mytopic", "key", JSON.toJSONString(userAccount));
    }
}

建立 Kafka Consumer 消費者

package com.example.anuoapp.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {
    public static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    @KafkaListener(topics = {"mytopic"})
    public void jktopic(ConsumerRecord consumerRecord) throws InterruptedException {
        System.out.println(consumerRecord.offset());
        System.out.println(consumerRecord.value().toString());
        Thread.sleep(3000);
    }
}

建立一個rest api 來呼叫 Kafka 的訊息生產者

package com.example.anuoapp.controller;

import com.example.anuoapp.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafkaAPI")
public class SystemController {
    private Logger logger = LoggerFactory.getLogger(SystemController.class);

    @Autowired
    KafkaProducer kafkaProducer;

    @RequestMapping(value = "/produce", method = RequestMethod.GET)
    public void WarnInfo() throws Exception {
        int count=10;
        for (int i = 0; i < count; i++) {
            kafkaProducer.kafkaSend();
        }
    }
}

輸出如下:

{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}  
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}  
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}