訊息佇列之kafka
阿新 • • 發佈:2021-01-24
[訊息佇列之activeMQ](https://www.cnblogs.com/pluto-charon/p/14225896.html)
[訊息佇列之RabbitMQ](https://www.cnblogs.com/pluto-charon/p/14288765.html)
## 1.kafka介紹
kafka是由scala語言開發的一個多分割槽,多副本的並且居於zookeeper協調的分散式的釋出-訂閱訊息系統。具有高吞吐、可持久化、可水平擴充套件、支援流處理等特性;能夠支撐海量資料的資料傳遞;並且將訊息持久化到磁碟中,並對訊息建立了備份保證了資料的安全。kafka在保證了較高的處理速度的同時,又能保證資料處理的低延遲和資料的零丟失。
kafka的特性:
1. 高吞吐量,低延遲:kafka每秒可以處理幾十萬條訊息,延遲最低大概毫秒,每個主題可以分為多個分割槽,消費組對分割槽進行消費操作
2. 可擴充套件性:支援熱擴充套件
3. 永續性,可靠性:訊息被持久化到本地磁碟,並且支援資料備份
4. 容錯性:允許叢集中節點失敗,如副本的數量為n,則允許n-1個節點失敗
5. 高併發:允許上千個客戶端同時讀寫
6. 可伸縮性:kafka在執行期間可以輕鬆的擴充套件或者收縮;可以擴充套件一個kafka主題來包含更多的分割槽
kafka的主要應用場景:
- 訊息處理
- 網站跟蹤
- 指標儲存
- 日誌聚合
- 流式處理
- 事件朔源
基本流程:
![](https://img2020.cnblogs.com/blog/1459011/202101/1459011-20210123233305608-1701696775.png)
kafka的關鍵角色:
- **Producer:**生產者即資料的釋出者,該角色將訊息釋出到kafka的topic中
- **Consumer:**消費者,可以從broker中讀取資料
- **Consumer Group:**每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)
- **Topic:**劃分資料的所屬類的一個類別屬性
- **Partition:**topic中的資料分割為一個或多個partition,每個topic中至少含有一個partition
- **Partition offset:**每條訊息都有一個當前partition下的唯一的64位元組的offset,它指名了這條訊息的起始位置
- **Replicas of Partition:**副本,是一個分割槽的備份
- **Broker:**kafka叢集中包含一個或多個伺服器 ,伺服器的節點稱為broker
- **Leader:**每個partition由多個副本,其中有且僅有一個作為leader,leader是當前負責資料的讀寫的partition
- **Follower:**Follower跟隨Leader,所有的寫請求都是通過leader路由,資料變更會廣播到所有的follower上,follower與leader的資料保持同步
- **AR:**分割槽中所有的副本統稱為AR
- **ISR:**所有與leader部分保持一定程度的副本組成ISR
- **OSR:**與leader副本同步滯後過多的副本
- **HW:**高水位,標識了一個特定的offset,消費者只能拉去到這個offset之前的訊息
- **LEO:**即日誌末端位移,記錄了該副本底層日誌中的下一條訊息的位移值
## 2.kafka的安裝
安裝kafka的前提是安裝zookeeper以及jdk環境。我這裡安裝的版本是jdk1.8.0_20,kafka_2.11-1.0.0,zookeeper-3.4.14。kafka與jdk的版本一定要對應。我之前用的kafka_2.12_2.3.0,就不行
1.將kafka的檔案上傳到home目錄下並解壓縮到/usr/local目錄下
```shell
root@localhost home]# tar -xvzf kafka_2.11-1.0.0.tgz -C /usr/local
```
2.進入kafka的config
```shell
[root@localhost local]# cd /usr/local/kafka_2.11-1.0.0/config
```
3.編輯server.properties檔案
```yaml
# 如果是叢集環境中,則每個broker.id要設定為不同
broker.id=0
# 將下面這一行開啟,這相當於kafka對外提供服務的入口
listeners=PLAINTEXT://192.168.189.150:9092
# 日誌儲存位置:log.dirs=/tmp/kafka_logs 改為
log.dirs=/usr/local/kafka_2.11-1.0.0/logs
# 修改zookeeper的地址
zookeeper.connect=192.168.189.150:2181
# 修改zookeeper的連線超時時長,預設為6000(可能會超時)
zookeeper.connection.timeout.ms=10000
```
3.啟動zookeeper
因為我是配置的zookeeper叢集,所以需要將三臺zookeeper都啟動。只啟動單臺伺服器zookeeper在選舉的時候將不可進行(當整個叢集超過半數機器宕機,zookeeper會認為叢集處於不可用狀態)
```shell
[root@localhost ~]# zkServer.sh start
# 檢視狀態
[root@localhost ~]# zkServer.sh status
```
4.啟動kafka
```shell
[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh config/server.properties
# 也可以使用後臺啟動的方式,如果不使用後臺啟動,則在啟動後操作需要新開一個窗口才能操作
[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh -daemon config/server.properties
```
5.建立一個主題
```shell
# --zookeeper: 指定了kafka所連線的zookeeper的服務地址
# --partitions: 指定了分割槽的個數
# --replication-factor: 指定了副本因子
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic charon --partitions 2 --replication-factor 1
Created topic "charon".
```
6.展示所有的主題(驗證建立的主題是否有問題)
```shell
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
charon
```
7.檢視某個主題的詳情
```shell
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic charon
Topic:charon PartitionCount:2 ReplicationFactor:1 Configs:
Topic: charon Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: charon Partition: 1 Leader: 0 Replicas: 0 Isr: 0
```
8.新開一個視窗啟動消費者接收訊息.
--bootstrap-server:指定連線kafka叢集的地址,9092是kafka服務的埠。因為我的配置檔案中配置的是具體地址,所以需要寫明具體地址。否則會報 **[Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available.**的錯
```shell
[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.189.150:9092 --topic charon
```
9.新開一個視窗啟動生產者產生訊息
--bootstrap-server:指定連線kafka叢集的地址,9092是kafka服務的埠。因為我的配置檔案中配置的是地址。
```shell
[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list 192.168.189.150:9092 --topic charon
```
10.產生訊息並消費訊息
```shell
# 生產者生產訊息
>hello charon good evening
# 消費者這邊接收到的訊息
hello charon good evening
```
當然上面這種方式,只有在同一個網段才能實現。
## 3.生產者和消費者
kafka生產流程:
![](https://img2020.cnblogs.com/blog/1459011/202101/1459011-20210123233438959-1615734513.png)
1)producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader
2)producer將訊息傳送給該leader
3)leader將訊息寫入本地log
4)followers從leader pull訊息,寫入本地log後向leader傳送ACK
5)leader收到所有ISR中的replication的ACK後,增加HW(high watermark,最後commit 的offset)並向producer傳送ACK
消費組:
![](https://img2020.cnblogs.com/blog/1459011/202101/1459011-20210123233518103-587435607.png)
kafka消費者是消費組的一部分,當多個消費者形成一個消費組來消費主題的時候,每個消費者都會收到來自不同分割槽的訊息。假如消費者都在同一個消費者組裡面,則是工作-佇列模型。假如消費者在不同的消費組裡面,則是釋出-訂閱模型。
當單個消費者無法跟上資料的生成速度時,就可以增加更多的消費者來分擔負載,每個消費者只處理部分分割槽的訊息,從而實現單個應用程式的橫向伸縮。但是千萬不要讓消費者的數量少於分割槽的數量,因為此時會有多餘的消費者空閒。
當有多個應用程式都需要從kafka獲取訊息時,讓每個應用程式對應一個消費者組,從而使每個應用程式都能獲取一個或多個topic的全部訊息。每個消費者對應一個執行緒,如果要在同一個消費者組中執行多個消費者,需要讓每個消費者執行在自己的執行緒中。
## 4.程式碼實踐
1.新增依賴:
```xml
```
生產者程式碼:
```java
package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @className: Producer
* @description: kafka的生產者
* @author: charon
* @create: 2021-01-18 08:52
*/
public class Producer {
/**topic*/
private static final String topic = "charon";
public static void main(String[] args) {
// 配置kafka的屬性
Properties properties = new Properties();
// 設定地址
properties.put("bootstrap.servers","192.168.189.150:9092");
// 設定應答型別,預設值為0。(0:生產者不會等待kafka的響應;1:kafka的leader會把這條訊息寫到本地日誌檔案中,但不會等待叢集中其他機器的成功響應;
// -1(all):leader會等待所有的follower同步完成,確保訊息不會丟失,除非kafka叢集中的所有機器掛掉,保證可用性)
properties.put("acks","all");
// 設定重試次數,大於0,客戶端會在訊息傳送失敗是重新發送
properties.put("reties",0);
// 設定批量大小,當多條訊息需要傳送到同一個分割槽時,生產者會嘗試合併網路請求,提交效率
properties.put("batch.size",10000);
// 生產者設定序列化方式,預設為:org.apache.kafka.common.serialization.StringSerializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 建立生產者
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 5; i++) {
String message = "hello,charon message "+ i ;
producer.send(new ProducerRecord(topic,message));
System.out.println("生產者傳送訊息:" + message);
}
producer.close();
}
}
```
消費者程式碼:
```java
package kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
* @className: Consumer
* @description: kafka的消費者
* @author: charon
* @create: 2021-01-18 08:53
*/
public class Consumer implements Runnable{
/**topic*/
private static final String topic = "charon";
/**kafka消費者*/
private static KafkaConsumer kafkaConsumer;
/**消費訊息*/
private static Consume