1. 程式人生 > 實用技巧 >SpringBoot整合Kafka訊息元件

SpringBoot整合Kafka訊息元件

1、Kafka是新一代的訊息系統,也是目前效能最好的訊息元件,在資料採集業務中被廣泛應用。這裡Kafka將基於Kerberos認證實現訊息元件處理。

修改pom.xml配置檔案,追加依賴庫配置,如下所示:

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0"
  3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
5 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.3.5
.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.example</groupId> 14 <artifactId>demo</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>demo</name> 17
<description>Demo project for Spring Boot</description> 18 19 <properties> 20 <java.version>1.8</java.version> 21 <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version> 22 </properties> 23 24 <dependencies> 25 <dependency> 26 <groupId>org.springframework.boot</groupId> 27 <artifactId>spring-boot-starter-web</artifactId> 28 </dependency> 29 30 <dependency> 31 <groupId>org.springframework.boot</groupId> 32 <artifactId>spring-boot-starter-test</artifactId> 33 <scope>test</scope> 34 <exclusions> 35 <exclusion> 36 <groupId>org.junit.vintage</groupId> 37 <artifactId>junit-vintage-engine</artifactId> 38 </exclusion> 39 </exclusions> 40 </dependency> 41 42 <!-- mysql驅動包 --> 43 <dependency> 44 <groupId>mysql</groupId> 45 <artifactId>mysql-connector-java</artifactId> 46 </dependency> 47 48 <!-- druid連線池 --> 49 <dependency> 50 <groupId>com.alibaba</groupId> 51 <artifactId>druid</artifactId> 52 <version>1.1.10</version> 53 </dependency> 54 55 <dependency> 56 <groupId>org.springframework.boot</groupId> 57 <artifactId>spring-boot-starter-data-jpa</artifactId> 58 </dependency> 59 <dependency> 60 <groupId>org.springframework.boot</groupId> 61 <artifactId>spring-boot-starter-cache</artifactId> 62 </dependency> 63 <dependency> 64 <groupId>org.hibernate</groupId> 65 <artifactId>hibernate-ehcache</artifactId> 66 </dependency> 67 68 <!-- activeMQ --> 69 <dependency> 70 <groupId>org.springframework.boot</groupId> 71 <artifactId>spring-boot-starter-activemq</artifactId> 72 </dependency> 73 74 <!-- rabbitMQ --> 75 <dependency> 76 <groupId>org.springframework.boot</groupId> 77 <artifactId>spring-boot-starter-amqp</artifactId> 78 </dependency> 79 80 <!-- kafka --> 81 <dependency> 82 <groupId>org.springframework.kafka</groupId> 83 <artifactId>spring-kafka</artifactId> 84 </dependency> 85 </dependencies> 86 87 <build> 88 <plugins> 89 <plugin> 90 <groupId>org.springframework.boot</groupId> 91 <artifactId>spring-boot-maven-plugin</artifactId> 92 </plugin> 93 </plugins> 94 <resources> 95 <resource> 96 <directory>src/main/resources</directory> 97 <includes> 98 <include>**/*.properties</include> 99 <include>**/*.yml</include> 100 <include>**/*.xml</include> 101 <include>**/*.p12</include> 102 <include>**/*.html</include> 103 <include>**/*.jpg</include> 104 <include>**/*.png</include> 105 </includes> 106 </resource> 107 </resources> 108 </build> 109 110 </project>

修改pom.xml配置檔案,追加依賴庫配置,如下所示:

 1 # 定義主機列表
 2 spring.kafka.bootstrap-servers=192.168.110.142:9092
 3 # 定義主題名稱
 4 spring.kafka.template.default-topic=test
 5 # 定義生產者配置
 6 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
 7 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 8 # 定義消費者配置
 9 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
10 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
11 # 資料分組
12 spring.kafka.consumer.group-id=group-1

使用Kafka訊息機制實現訊息傳送介面,如下所示:

 1 package com.demo.producer;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.kafka.core.KafkaTemplate;
 5 import org.springframework.stereotype.Service;
 6 
 7 @Service
 8 public class KafkaMessageProducer {
 9 
10     // kafka訊息模板
11     @Autowired
12     private KafkaTemplate<String, String> kafkaTemplate;
13 
14     public void send(String text) {
15         // 傳送訊息
16         this.kafkaTemplate.sendDefault("message-key", text);
17     }
18 
19 }

建立一個Kafka訊息的消費程式類,如下所示:

 1 package com.demo.consumer;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord;
 4 import org.springframework.kafka.annotation.KafkaListener;
 5 import org.springframework.stereotype.Service;
 6 
 7 @Service
 8 public class KafkaMessageConsumer {
 9 
10     /**
11      * 進行訊息接收處理
12      * 
13      * @param record
14      */
15     @KafkaListener(topics = { "test" })
16     public void receiveMessage(ConsumerRecord<String, String> record) {
17         System.err.println("【*** 接收訊息 ***】 key = " + record.key() + " , value = " + record.value());
18     }
19 
20 }

通過測試程式呼叫IMessageProducer介面進行訊息傳送,由於Kafka已經配置了自動建立主題,所以即使現在主題不存在,也不影響程式執行。

 1 package com.demo.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.demo.producer.KafkaMessageProducer;
 9 
10 @Controller
11 public class KafkaMessageController {
12 
13     @Autowired
14     private KafkaMessageProducer kafkaMessageProducer;
15 
16     @RequestMapping(value = "/messageProducer")
17     @ResponseBody
18     public void findAll() {
19         for (int i = 0; i < 20000; i++) {
20             if (i % 20 == 0) {
21                 try {
22                     Thread.sleep(1000);
23                 } catch (InterruptedException e) {
24                     e.printStackTrace();
25                 }
26             }
27             kafkaMessageProducer.send("Kafka producer message : " + i);
28         }
29     }
30 }

如果啟動專案報下面的錯誤,如下所示:

1 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient   : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient   : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
3 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient   : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
4 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient   : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

修改server.properties的兩行預設配置,即可通過外網連線伺服器Kafka,問題解決:

1 # 允許外部埠連線                                            
2 listeners=PLAINTEXT://0.0.0.0:9092  
3 # 外部代理地址                                                
4 advertised.listeners=PLAINTEXT://192.168.110.142:9092