Spring 系列 (16) - Springboot+Kafka 實現釋出/訂閱訊息
訊息是一個位元組陣列,可以在這些位元組陣列中儲存任何物件,支援的資料格式包括 String、JSON 等。
訊息佇列 (Message Queue): 是分散式系統中重要的組成部分,主要解決應用解耦、非同步訊息、流量削鋒等問題,實現高效能、高可用、可伸縮和最終一致性架構。目前使用較多的訊息佇列有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。
Message Queue 的通訊模式:
(1) 點對點通訊:點對點方式是最為傳統和常見的通訊方式,它支援一對一、一對多、多對多、多對一等多種配置方式,支援樹狀、網狀等多種拓撲結構。
(2) 多點廣播:將訊息傳送到多個目標站點 (Destination List)。可以使用一條 MQ 指令將單一訊息傳送到多個目標站點,並確保為每一站點可靠地提供資訊。MQ 不僅提供了多點廣播的功能,而且還擁有智慧訊息分發功能,在將一條訊息傳送到同一系統上的多個使用者時,MQ 將訊息的一個複製版本和該系統上接收者的名單傳送到目標 MQ 系統。目標 MQ 系統在本地複製這些訊息,並將它們傳送到名單上的佇列,從而儘可能減少網路的傳輸量。
(3) 釋出/訂閱 (Publish/Subscribe) 模式:釋出/訂閱功能使訊息的分發可以突破目的佇列地理指向的限制,使訊息按照特定的主題甚至內容進行分發,使用者或應用程式可以根據主題或內容接收到所需要的訊息。釋出/訂閱功能使得傳送者和接收者之間的耦合關係變得更為鬆散,傳送者不必關心接收者的目的地址,而接收者也不必關心訊息的傳送地址,而只是根據訊息的主題進行訊息的收發。
(4) 群集 (Cluster):為了簡化點對點通訊模式中的系統配置,MQ 提供 Cluster(群集) 的解決方案。群集類似於一個域 (Domain),群集內部的佇列管理器之間通訊時,不需要兩兩之間建立訊息通道,而是採用群集 (Cluster) 通道與其它成員通訊,從而大大簡化了系統配置。此外,群集中的佇列管理器之間能夠自動進行負載均衡,當某一佇列管理器出現故障時,其它佇列管理器可以接管它的工作,從而大大提高系統的高可靠性。
Apache Kafka 是一個分散式釋出/訂閱訊息系統和一個強大的訊息佇列 (MQ),可以處理大量的訊息(資料),並使您能夠將訊息從一個端點傳遞到另一個端點。 Kafka 適合離線和線上訊息消費。 Kafka 訊息保留在磁碟上,並在叢集集內複製以防止資料丟失。 Kafka 構建在 ZooKeeper 同步服務之上。 它與 Apache Storm 和 Spark 很容易整合,用於實時流式資料分析。
Kafka 通過給每一個訊息繫結一個鍵值的方式來保證生產者可以把所有的訊息傳送到指定位置。屬於某一個消費者群組的消費者訂閱了一個主題,通過該訂閱消費者可以跨節點地接收所有與該主題相關的訊息,每一個訊息只會傳送給群組中的一個消費者,所有擁有相同鍵值的訊息都會被確保發給這一個消費者。
Kafka 術語:
(1) Broker (訊息代理):Kafka 叢集包含一個或多個伺服器,這種伺服器被稱為 broker。
(2) Topic (主題):每條釋出到 Kafka 叢集的訊息都有一個類別,這個類別被稱為 Topic。(物理上不同 Topic 的訊息分開儲存,邏輯上一個 Topic 的訊息雖然保存於一個或多個 broker 上,但使用者只需指定訊息的 Topic 即可生產或消費資料而不必關心資料存於何處)。
(3) Partition(分割槽):Partition 是物理上的概念,每個 Topic 包含一個或多個 Partition。
(4) Producer(生產者):負責釋出訊息到 Kafka broker。
(5) Consumer(消費者):訊息消費者是從 Kafka broker 讀取訊息的客戶端。
(6) Consumer Group(消費者組):每個 Consumer 屬於一個特定的 Consumer Group(可為每個 Consumer 指定 group name,若不指定 group name 則屬於預設的 group)。
Kafka:https://kafka.apache.org/
Kafka GitHub:https://github.com/apache/kafka
ZooKeeper 是 Apache 的一個頂級專案,為分散式應用提供高效、高可用的分散式協調服務,提供了諸如資料釋出/訂閱、負載均衡、命名服務、分散式協調/通知和分散式鎖等分散式基礎服務。由於 ZooKeeper 便捷的使用方式、卓越的效能(基於記憶體)和良好的穩定性,被廣泛地應用於諸如 Hadoop、HBase、Kafka 和 Dubbo 等大型分散式系統中。
ZooKeeper 術語:
(1) Leader(領導者):負責進行投票的發起和決議,更新系統狀態。
(2) Follower(跟隨者):用於接收客戶端請求並給客戶端返回結果,在選主過程中進行投票。
(3) Observer(觀察者):可以接受客戶端連線,將寫請求轉發給 leader,但是observer 不參加投票的過程,只是為了擴充套件系統,提高讀取的速度。
ZooKeeper:https://zookeeper.apache.org
Zookeeper GitHub: https://github.com/apache/zookeeper
1. Kafka 安裝配置
1) Windows 下安裝
下載 https://archive.apache.org/dist/kafka/3.0.1/kafka_2.13-3.0.1.tgz,儲存到目錄 C:\Applications\Java\,kafka_2.13-3.0.1.tgz 檔名裡的 2.13 是對應 Scala 版本 2.13,3.0.1 是 Kafka 的版本。
啟動一個 cmd 視窗,進入目錄 C:\Applications\Java\,執行如下命令。
C:\Applications\Java\> tar -zvxf kafka_2.13-3.0.1.tgz
C:\Applications\Java\> move kafka_2.13-3.0.1 kafka-3.0.1
C:\Applications\Java\> cd kafka-3.0.1
C:\Applications\Java\kafka-3.0.1> mkdir data
C:\Applications\Java\kafka-3.0.1> mkdir logs
修改 C:\Applications\Java\kafka-3.0.1\config\server.properties 檔案裡的 log.dirs:
log.dirs=C:/Applications/Java/kafka-3.0.1/logs
Kafka 執行包裡自帶 ZooKeeper,修改 C:\Applications\Java\kafka-3.0.1\config\zookeeper.properties 檔案裡的 dataDir:
dataDir=C:/Applications/Java/kafka-3.0.1/data
在一個主機上執行一個 Broker:
(1) 啟動 Kafka & Zookeeper
在啟動 Kafka 之前,要先啟動 Zookeeper Server 。
C:\Applications\Java\kafka-3.0.1\bin\windows>zookeeper-server-start ..\..\config\zookeeper.properties
C:\Applications\Java\kafka-3.0.1\bin\windows>kafka-server-start ..\..\config\server.properties
# 檢視狀態
> jps
2501 Jps
1991 QuorumPeerMain
2024 Kafka
QuorumPeerMain 是 ZooKeeper 守護程序,2024 是 Kafka 守護程序。
Zookeeper 預設埠是 2181, Kafka broker 預設埠是 9092。
(2) 停止 Kafka & Zookeeper
C:\Applications\Java\kafka-3.0.1\bin\windows>kafka-server-stop
C:\Applications\Java\kafka-3.0.1\bin\windows>zookeeper-server-stop
Kafka 的執行方式:
(1) 在一個主機上執行一個 Broker;
(2) 在一個主機上執行多個 Brokers;
(3) 在多個主機上執行多個 Brokers。
注:如果不使用 Kafka 自帶的 Zookeeper,可以獨立安裝配置 Zookeeper。
2) Ubuntu 下安裝
$ wget https://archive.apache.org/dist/kafka/3.0.1/kafka_2.13-3.0.1.tgz
$ mv ./kafka_2.13-3.0.1.tgz ~/apps/ # 這裡使用 /home/xxx/apps 目錄, xxx 是使用者根目錄
$ tar -zvxf kafka_2.13-3.0.1.tgz
$ mv kafka_2.13-3.0.1 kafka-3.0.1
$ cd kafka-3.0.1
$ mkdir data
$ mkdir logs
修改 ~/apps/kafka-3.0.1/config/server.properties 檔案裡的 log.dirs:
log.dirs=/home/xxx/apps/kafka-3.0.1/logs
Kafka 執行包裡自帶 ZooKeeper,修改 ~/apps/kafka-3.0.1/config/zookeeper.properties 檔案裡的 dataDir:
dataDir=/home/xxx/apps/kafka-3.0.1/data
在一臺主機上執行一個 Broker:
(1) 啟動 Kafka
在啟動 Kafka 之前,要確保 Zookeeper Server 已經在執行。
$ ./bin/zookeeper-server-start.sh ../config/zookeeper.properties
$ ./bin/kafka-server-start.sh ../config/server.properties
# 檢視狀態
$ jps
2501 Jps
1991 QuorumPeerMain
2024 Kafka
QuorumPeerMain 是 ZooKeeper 守護程序,2024 是 Kafka 守護程序。
Zookeeper 預設埠是 2181, Kafka broker 預設埠是 9092。
(2) 停止 Kafka
$ ./bin/kafka-server-stop.sh
2. 獨立 ZooKeeper 安裝配置 (非必需)
1) Windows 下安裝
下載 https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz,儲存到目錄 C:\Applications\Java\。
啟動一個 cmd 視窗,進入目錄 C:\Applications\Java\,執行如下命令。
C:\Applications\Java\> tar -zvxf apache-zookeeper-3.6.3-bin.tar.gz
C:\Applications\Java\> move apache-zookeeper-3.6.3-bin zookeeper-3.6.3
C:\Applications\Java\> cd zookeeper-3.6.3
C:\Applications\Java\zookeeper-3.6.3> mkdir data
複製 C:\Applications\Java\zookeeper-3.6.3\conf\zoo_sample.cfg 生成 C:\Applications\Java\zookeeper-3.6.3\conf\zoo.cfg,修改 zoo.cfg 檔案裡的 dataDir :
dataDir=C:\Applications\Java\zookeeper-3.6.3\data
執行 ZooKeeper Server
C:\Applications\Java\zookeeper-3.6.3\bin> zkServer
執行 ZooKeeper Cli
C:\Applications\Java\zookeeper-3.6.3\bin> zkCli
2) Ubuntu 下安裝
$ wget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
$ mv ./apache-zookeeper-3.6.3-bin.tar.gz ~/apps/ # 這裡使用 /home/xxx/apps 目錄, xxx 是使用者根目錄
$ tar -zvxf apache-zookeeper-3.6.3-bin.tar.gz
$ mv apache-zookeeper-3.6.3-bin zookeeper-3.6.3
$ cd zookeeper-3.6.3
$ mkdir data
$ cp conf/zoo_sample.cfg conf/zoo.cfg
$ vim conf/zoo.cfg
dataDir = /home/xxx/apps/zookeeper-3.6.3/data
啟動 ZooKeeper Server
$ ./bin/zkServer.sh start
執行 ZooKeeper Cli
$ ./bin/zkCli.sh
停止 ZooKeeper server
$ ./bin/zkServer.sh stop
3. 開發環境
Windows版本:Windows 10 Home (20H2)
IntelliJ IDEA (https://www.jetbrains.com/idea/download/):Community Edition for Windows 2020.1.4
Apache Maven (https://maven.apache.org/):3.8.1
注:Spring 開發環境的搭建,可以參考 “ Spring基礎知識(1)- Spring簡介、Spring體系結構和開發環境配置 ”。
4. 建立 Spring Boot 基礎專案
專案例項名稱:SpringbootExample16
Spring Boot 版本:2.6.6
建立步驟:
(1) 建立 Maven 專案例項 SpringbootExample16;
(2) Spring Boot Web 配置;
(3) 匯入 Thymeleaf 依賴包;
(4) 配置 jQuery;
具體操作請參考 “Spring 系列 (2) - 在 Spring Boot 專案裡使用 Thymeleaf、JQuery+Bootstrap 和國際化” 裡的專案例項 SpringbootExample02,文末包含如何使用 spring-boot-maven-plugin 外掛執行打包的內容。
SpringbootExample16 和 SpringbootExample02 相比,SpringbootExample16 不配置 Bootstrap、模版檔案(templates/*.html)和國際化。
5. 配置 Kafka
1) 修改 pom.xml,匯入 Kafka 依賴包
1 <project ... > 2 ... 3 <dependencies> 4 ... 5 6 <dependency> 7 <groupId>org.springframework.kafka</groupId> 8 <artifactId>spring-kafka</artifactId> 9 </dependency> 10 11 ... 12 </dependencies> 13 14 ... 15 </project>
在IDE中專案列表 -> SpringbootExample16 -> 點選滑鼠右鍵 -> Maven -> Reload Project
2) 修改 src/main/resources/application.properties 檔案,新增如下配置
########### Kafka 叢集 ###########
spring.kafka.bootstrap-servers=localhost:9092
########### 初始化生產者配置 ###########
# 生產端緩衝區大小 (30 MB)
spring.kafka.producer.buffer-memory=31457280
# Kafka 提供的序列化和反序列化類
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 重試次數
#spring.kafka.producer.retries=0
# 應答級別: 多少個分割槽副本備份完成時向生產者傳送 ack 確認(可選0、1、all/-1)
#spring.kafka.producer.acks=1
# 批量大小
#spring.kafka.producer.batch-size=16384
# 提交延時(當生產端積累的訊息達到 batch-size 或接收到訊息 linger.ms 後,
# 生產者就會將訊息提交給 kafka linger.ms 為 0 表示每接收到一條訊息就提交給 kafka,
# 這時候 batch-size 其實就沒用了)
#spring.kafka.producer.properties.linger.ms=0
# 自定義分割槽器
#spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
########### 初始化消費者配置 ###########
# 預設的消費組 ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自動提交 offset
spring.kafka.consumer.enable-auto-commit=true
# 提交 offset 延時(接收到訊息後多久提交 offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# Kafka 提供的序列化和反序列化類
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 當 kafka 中沒有初始 offset 或 offset 超出範圍時將自動重置 offset
# earliest: 重置為分割槽中最小的 offset;
# latest: 重置為分割槽中最新的 offset(消費分割槽中新產生的資料);
# none: 只要有一個分割槽不存在已提交的 offset,就丟擲異常;
#spring.kafka.consumer.auto-offset-reset=latest
# 消費會話超時時間(超過這個時間 consumer 沒有傳送心跳,就會觸發 rebalance 操作)
#spring.kafka.consumer.properties.session.timeout.ms=120000
# 消費請求超時時間
#spring.kafka.consumer.properties.request.timeout.ms=180000
# 消費端監聽的 topic 不存在時,專案啟動會報錯(關掉)
#spring.kafka.listener.missing-topics-fatal=false
# 設定批量消費
#spring.kafka.listener.type=batch
# 批量消費每次最多消費多少條訊息
#spring.kafka.consumer.max-poll-records=50
注:這裡對 Kafka 詳細的引數配置不做介紹,但本文保留了(註釋掉了)沒用到的引數。
6. 測試例項 (Web 模式)
1) 建立 src/main/java/com/example/kafka/KafkaConsumerListener.java 檔案
1 package com.example.kafka; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 import org.springframework.stereotype.Component; 7 import org.apache.kafka.clients.consumer.ConsumerRecord; 8 import org.springframework.kafka.annotation.KafkaListener; 9 10 @Component 11 public class KafkaConsumerListener { 12 13 public List<String> messageList = new ArrayList<>(); 14 15 @KafkaListener(topics = {"topic1"}) 16 public void onMessage(ConsumerRecord<?, ?> record) { 17 System.out.println("Kafka Consumer: " + record.topic() + " - " + record.partition() + " - " + record.value()); 18 19 if (record != null) 20 messageList.add((String)record.value()); 21 } 22 23 }
注:消費者使用 @KafkaListener 監聽 topic1 主題,把收到的訊息儲存到 messageList 。
2) 建立 src/main/resources/templates/client.html 檔案
1 <html lang="en" xmlns:th="http://www.thymeleaf.org"> 2 <head> 3 <meta charset="UTF-8"> 4 <title th:text="${var}">Title</title> 5 <script language="javascript" th:src="@{/lib/jquery/jquery-3.6.0.min.js}"></script> 6 </head> 7 <body> 8 9 <h3>Kafka Client</h3> 10 <hr> 11 12 <p>Producer</p> 13 <p> 14 <input type="text" name="message" id="message" style="width: 320px; height: 32px;" value="" /> 15 <button type="button" id="btn_send">Send</button> 16 </p> 17 <div id="producer_result" style="width: 50%; padding: 15px;"></div> 18 <p> </p> 19 <hr> 20 <p>Consumer</p> 21 <p><button type="button" id="btn_receive">Receive</button></p> 22 <div id="consumer_result" style="width: 50%; padding: 15px;"></div> 23 24 <p> </p> 25 <script type="text/javascript"> 26 27 $(document).ready(function(){ 28 29 $('#btn_send').click(function() { 30 31 var msg = $('#message').val(); 32 if (msg == '') { 33 alert('Please enter message'); 34 $('#message').focus(); 35 return; 36 } 37 38 $.ajax({ 39 40 type: 'POST', 41 url: '/message/post', 42 data: { 43 message: msg, 44 }, 45 success: function(response) { 46 console.log(response); 47 if (response.ret == "OK") { 48 $('#producer_result').append("Sent '" + msg + "'<br>"); 49 } else { 50 $('#producer_result').append("Failed to send '" + msg + "'<br>"); 51 } 52 }, 53 error: function(err) { 54 console.log(err); 55 $('#producer_result').append("Error: AJAX issue<br>"); 56 } 57 }); 58 59 }); 60 61 $('#btn_receive').click(function() { 62 63 $.ajax({ 64 type: 'GET', 65 url: '/message/get', 66 success: function(response) { 67 console.log(response); 68 if (response.ret == "OK") { 69 $('#consumer_result').html(response.message + "<br>"); 70 } else { 71 $('#consumer_result').html("No message<br>"); 72 } 73 }, 74 error: function(err) { 75 console.log(err); 76 $('#consumer_result').html("Error: AJAX issue<br>"); 77 }, 78 }); 79 }); 80 }); 81 82 </script> 83 84 </body> 85 </html>
3) 修改 src/main/java/com/example/controller/IndexController.java 檔案
1 package com.example.controller; 2 3 import java.util.Map; 4 import java.util.HashMap; 5 6 import org.springframework.stereotype.Controller; 7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.web.bind.annotation.RequestMapping; 9 import org.springframework.web.bind.annotation.RequestParam; 10 import org.springframework.web.bind.annotation.ResponseBody; 11 12 import org.springframework.kafka.core.KafkaTemplate; 13 import com.example.kafka.KafkaConsumerListener; 14 15 @Controller 16 public class IndexController { 17 @Autowired 18 private KafkaTemplate<String, Object> kafkaTemplate; 19 @Autowired 20 private KafkaConsumerListener kafkaConsumerListener; 21 22 @ResponseBody 23 @RequestMapping("/test") 24 public String test() { 25 return "Test Page"; 26 } 27 28 @RequestMapping("/client") 29 public String client() { 30 return "client"; 31 } 32 33 @ResponseBody 34 @RequestMapping("/message/post") 35 public Map<String, String> messagePost(@RequestParam String message) { 36 37 Map<String, String> map = new HashMap(); 38 if ("".equals(message)) { 39 map.put("ret", "ERROR"); 40 map.put("description", "Message is empty"); 41 } else { 42 43 // 生產者傳送訊息到 topic1 主題 44 kafkaTemplate.send("topic1", message); 45 map.put("ret", "OK"); 46 map.put("description", "Message has been sent"); 47 } 48 49 return map; 50 } 51 52 @ResponseBody 53 @RequestMapping("/message/get") 54 public Map<String, String> messageGet() { 55 56 Map<String, String> map = new HashMap(); 57 58 if (kafkaConsumerListener.messageList.size() == 0) { 59 map.put("ret", "ERROR"); 60 map.put("description", "Message is empty"); 61 } else { 62 String str = ""; 63 for (String item : kafkaConsumerListener.messageList) { 64 if ("".equals(str)) { 65 str = item; 66 } else { 67 str += "<br>" + item; 68 } 69 } 70 71 map.put("ret", "OK"); 72 map.put("message", str); 73 } 74 75 return map; 76 } 77 78 }
注:Spring Bean 的作用域被預設為 singleton,這裡自動裝配的 kafkaConsumerListener 就是那個唯一的 Bean,所以可以在 IndexController 裡讀取到消費者儲存的資料。
執行並訪問 http://localhost:9090/client,確保 Kafka 和 ZooKeeper 服務已正常執行,在頁面上 Producer 部分發送訊息,在 Consumer 部分接收訊息,訊息有延時。