1. 程式人生 > 實用技巧 >Java程式設計師說:世界上有三個偉大的發明【火、輪子、kafka】

Java程式設計師說:世界上有三個偉大的發明【火、輪子、kafka】

一、Kafka 是什麼?

有人說世界上有三個偉大的發明:火,輪子,以及 Kafka。

發展到現在,Apache Kafka 無疑是很成功的,Confluent 公司曾表示世界五百強中有三分之一的企業在使用 Kafka。在流式計算中,Kafka 一般用來快取資料,例如 Flink 通過消費 Kafka 的資料進行計算。

關於Kafka,我們最先需要了解的是以下四點:

  1. Apache Kafka 是一個開源「訊息」系統,由 Scala 寫成。是由 Apache 軟體基金會開發的 一個開源訊息系統專案。
  2. Kafka 最初是由 LinkedIn 公司開發,用作 LinkedIn 的活動流(Activity Stream)和運營資料處理管道(Pipeline)的基礎,現在它已被多家不同型別的公司作為多種型別的資料管道和訊息系統使用。
  3. 「Kafka 是一個分散式訊息佇列」。Kafka 對訊息儲存時根據 Topic 進行歸類,傳送訊息 者稱為 Producer,訊息接受者稱為 Consumer,此外 kafka 叢集有多個 kafka 例項組成,每個 例項(server)稱為 broker。
  4. 無論是 kafka 叢集,還是 consumer 都依賴於「Zookeeper」叢集儲存一些 meta 資訊, 來保證系統可用性。

二、為什麼要有 Kafka?

「kafka」之所以受到越來越多的青睞,與它所扮演的三大角色是分不開的的:

  • 「訊息系統」:kafka與傳統的訊息中介軟體都具備系統解耦、冗餘儲存、流量削峰、緩衝、非同步通訊、擴充套件性、可恢復性等功能。與此同時,kafka還提供了大多數訊息系統難以實現的訊息順序性保障及回溯性消費的功能。
  • 「儲存系統」:kafka把訊息持久化到磁碟,相比於其他基於記憶體儲存的系統而言,有效的降低了訊息丟失的風險。這得益於其訊息持久化和多副本機制。也可以將kafka作為長期的儲存系統來使用,只需要把對應的資料保留策略設定為“永久”或啟用主題日誌壓縮功能。
  • 「流式處理平臺」:kafka為流行的流式處理框架提供了可靠的資料來源,還提供了一個完整的流式處理框架,比如視窗、連線、變換和聚合等各類操作。

Kafka特性分散式具備經濟、快速、可靠、易擴充、資料共享、裝置共享、通訊方便、靈活等,分散式所具備的特性高吞吐量同時為資料生產者和消費者提高吞吐量高可靠性支援多個消費者,當某個消費者失敗的時候,能夠自動負載均衡離線能將訊息持久化,進行批量處理解耦作為各個系統連線的橋樑,避免系統之間的耦合

三、Kafka 基本概念

在深入理解 Kafka 之前,可以先了解下 Kafka 的基本概念。

一個典型的 Kafka 包含若干Producer、若干 Broker、若干 Consumer 以及一個 Zookeeper 叢集。Zookeeper 是 Kafka 用來負責叢集元資料管理、控制器選舉等操作的。Producer 是負責將訊息傳送到 Broker 的,Broker 負責將訊息持久化到磁碟,而 Consumer 是負責從Broker 訂閱並消費訊息。Kafka體系結構如下所示:

概念一:生產者(Producer)與消費者(Consumer)

生產者和消費者

對於 Kafka 來說客戶端有兩種基本型別:「生產者」(Producer)和「消費者」(Consumer)。除此之外,還有用來做資料整合的 Kafka Connect API 和流式處理的「Kafka Streams」等高階客戶端,但這些高階客戶端底層仍然是生產者和消費者API,只不過是在上層做了封裝。

  • 「Producer」:訊息生產者,就是向 Kafka broker 發訊息的客戶端;
  • 「Consumer」:訊息消費者,向 Kafka broker 取訊息的客戶端;

概念二:Broker 和叢集(Cluster)

一個 Kafka 伺服器也稱為「Broker」,它接受生產者傳送的訊息並存入磁碟;Broker 同時服務消費者拉取分割槽訊息的請求,返回目前已經提交的訊息。使用特定的機器硬體,一個 Broker 每秒可以處理成千上萬的分割槽和百萬量級的訊息。

若干個 Broker 組成一個「叢集」「Cluster」),其中叢集內某個 Broker 會成為叢集控制器(Cluster Controller),它負責管理叢集,包括分配分割槽到 Broker、監控 Broker 故障等。在叢集內,一個分割槽由一個 Broker 負責,這個 Broker 也稱為這個分割槽的 Leader;當然一個分割槽可以被複制到多個 Broker 上來實現冗餘,這樣當存在 Broker 故障時可以將其分割槽重新分配到其他 Broker 來負責。下圖是一個樣例:

Broker 和叢集(Cluster)

概念三:主題(Topic)與分割槽(Partition)

主題(Topic)與分割槽(Partition)

在 Kafka 中,訊息以「主題」「Topic」)來分類,每一個主題都對應一個「「訊息佇列」」,這有點兒類似於資料庫中的表。但是如果我們把所有同類的訊息都塞入到一個“中心”佇列中,勢必缺少可伸縮性,無論是生產者/消費者數目的增加,還是訊息數量的增加,都可能耗盡系統的效能或儲存。

我們使用一個生活中的例子來說明:現在 A 城市生產的某商品需要運輸到 B 城市,走的是公路,那麼單通道的高速公路不論是在「A 城市商品增多」還是「現在 C 城市也要往 B 城市運輸東西」這樣的情況下都會出現「吞吐量不足」的問題。所以我們現在引入「分割槽」「Partition」)的概念,類似“允許多修幾條道”的方式對我們的主題完成了水平擴充套件。

四、Kafka 工作流程分析

4.1 Kafka 生產過程分析

4.1.1 寫入方式

producer 採用推(push)模式將訊息釋出到 broker,每條訊息都被追加(append)到分割槽(patition)中,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障 kafka 吞吐率)

4.1.2 分割槽(Partition)

訊息傳送時都被髮送到一個 topic,其本質就是一個目錄,而 topic 是由一些 Partition Logs(分割槽日誌)組成,其組織結構如下圖所示:

我們可以看到,每個 Partition 中的訊息都是「有序」的,生產的訊息被不斷追加到 Partition log 上,其中的每一個訊息都被賦予了一個唯一的「offset」值。

「1)分割槽的原因」

  1. 方便在叢集中擴充套件,每個 Partition 可以通過調整以適應它所在的機器,而一個 topic 又可以有多個 Partition 組成,因此整個叢集就可以適應任意大小的資料了;
  2. 可以提高併發,因為可以以 Partition 為單位讀寫了。

「2)分割槽的原則」

  1. 指定了 patition,則直接使用;
  2. 未指定 patition 但指定 key,通過對 key 的 value 進行 hash 出一個 patition;
  3. patition 和 key 都未指定,使用輪詢選出一個 patition。
DefaultPartitioner類
publicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){
List<PartitionInfo>partitions=cluster.partitionsForTopic(topic);
intnumPartitions=partitions.size();
if(keyBytes==null){
intnextValue=nextValue(topic);
List<PartitionInfo>availablePartitions=cluster.availablePartitionsForTopic(topic);
if(availablePartitions.size()>0){
intpart=Utils.toPositive(nextValue)%availablePartitions.size();
returnavailablePartitions.get(part).partition();
}else{
//nopartitionsareavailable,giveanon-availablepartition
returnUtils.toPositive(nextValue)%numPartitions;
}
}else{
//hashthekeyBytestochooseapartition
returnUtils.toPositive(Utils.murmur2(keyBytes))%numPartitions;
}
}

4.1.3 副本(Replication)

同 一 個 partition 可 能 會 有 多 個 replication ( 對 應server.properties配 置 中 的default.replication.factor=N)。沒有 replication 的情況下,一旦 broker 宕機,其上所有 patition 的資料都不可被消費,同時 producer 也不能再將資料存於其上的 patition。引入 replication 之後,同一個 partition 可能會有多個 replication,而這時需要在這些 replication 之間選出一 個 leader,producer 和 consumer 只與這個 leader 互動,其它 replication 作為 follower 從 leader 中複製資料。

4.1.4 寫入流程

producer 寫入訊息流程如下:

1)producer 先從 zookeeper 的 "/brokers/.../state"節點找到該 partition 的 leader ;

2)producer 將訊息傳送給該 leader ;

3)leader 將訊息寫入本地 log ;

4)followers 從 leader pull 訊息,寫入本地 log 後向 leader 傳送 ACK ;

5)leader 收到所有 ISR 中的 replication 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset)並向 producer 傳送 ACK ;

4.2 Broker 儲存訊息

4.2.1 儲存方式

物理上把 topic 分成一個或多個 patition(對應server.properties中的num.partitions=3配 置),每個 patition 物理上對應一個資料夾(該資料夾儲存該 patition 的所有訊息和索引文 件),如下:

[root@hadoop102logs]$ll
drwxrwxr-x.2demodemo40968月614:37first-0
drwxrwxr-x.2demodemo40968月614:35first-1
drwxrwxr-x.2demodemo40968月614:37first-2

[root@hadoop102logs]$cdfirst-0
[root@hadoop102first-0]$ll
-rw-rw-r--.1demodemo104857608月614:3300000000000000000000.index
-rw-rw-r--.1demodemo2198月615:0700000000000000000000.log
-rw-rw-r--.1demodemo104857568月614:3300000000000000000000.timeindex
-rw-rw-r--.1demodemo88月614:37leader-epoch-checkpoint

4.2.2 儲存策略

無論訊息是否被消費,kafka 都會保留所有訊息。有兩種策略可以刪除舊資料:

  • 基於時間:log.retention.hours=168
  • 基於大小:log.retention.bytes=1073741824

需要注意的是,因為 Kafka 讀取特定訊息的時間複雜度為 O(1),即與檔案大小無關, 所以這裡刪除過期檔案與提高 Kafka 效能無關。

4.2.3 Zookeeper 儲存結構

注意:producer 不在 zk 中註冊,消費者在 zk 中註冊。

4.3 Kafka 消費過程分析

kafka 提供了兩套 consumer API:高階 Consumer API 和低階 Consumer API。

4.3.1 高階 API

「1)高階 API 優點」

  • 高階 API 寫起來簡單
  • 不需要自行去管理 offset,系統通過 zookeeper 自行管理。
  • 不需要管理分割槽,副本等情況,系統自動管理。
  • 消費者斷線會自動根據上一次記錄在 zookeeper 中的 offset 去接著獲取資料(預設設定 1 分鐘更新一下 zookeeper 中存的 offset)
  • 可以使用 group 來區分對同一個 topic 的不同程式訪問分離開來(不同的 group 記錄不同的 offset,這樣不同程式讀取同一個 topic 才不會因為 offset 互相影響)

「2)高階 API 缺點」

  • 不能自行控制 offset(對於某些特殊需求來說)
  • 不能細化控制如分割槽、副本、zk 等

4.3.2 低階 API

「1)低階 API 優點」

  • 能夠讓開發者自己控制 offset,想從哪裡讀取就從哪裡讀取。
  • 自行控制連線分割槽,對分割槽自定義進行負載均衡
  • 對 zookeeper 的依賴性降低(如:offset 不一定非要靠 zk 儲存,自行儲存 offset 即可, 比如存在檔案或者記憶體中)

「2)低階 API 缺點」

  • 太過複雜,需要自行控制 offset,連線哪個分割槽,找到分割槽 leader 等。

4.3.3 消費者組

消費者是以 consumer group 消費者組的方式工作,由一個或者多個消費者組成一個組, 共同消費一個 topic。每個分割槽在同一時間只能由 group 中的一個消費者讀取,但是多個 group 可以同時消費這個 partition。在圖中,有一個由三個消費者組成的 group,有一個消費者讀取主題中的兩個分割槽,另外兩個分別讀取一個分割槽。某個消費者讀取某個分割槽,也可以叫做某個消費者是某個分割槽的擁有者。

在這種情況下,消費者可以通過水平擴充套件的方式同時讀取大量的訊息。另外,如果一個消費者失敗了,那麼其他的 group 成員會自動負載均衡讀取之前失敗的消費者讀取的分割槽。

4.3.4 消費方式

consumer 採用 pull(拉)模式從 broker 中讀取資料。

push(推)模式很難適應消費速率不同的消費者,因為訊息傳送速率是由 broker 決定的。它的目標是儘可能以最快速度傳遞訊息,但是這樣很容易造成 consumer 來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費訊息。

對於 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費 訊息的速率,同時 consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。

pull 模式不足之處是,如果 kafka 沒有資料,消費者可能會陷入迴圈中,一直等待資料 到達。為了避免這種情況,我們在我們的拉請求中有引數,允許消費者請求在等待資料到達 的“長輪詢”中進行阻塞(並且可選地等待到給定的位元組數,以確保大的傳輸大小)。

五、Kafka 安裝

5.1 安裝環境與前提條件

安裝環境:Linux

前提條件:

Linux系統下安裝好jdk 1.8以上版本,正確配置環境變數 Linux系統下安裝好scala 2.11版本

安裝ZooKeeper(注:kafka自帶一個Zookeeper服務,如果不單獨安裝,也可以使用自帶的ZK)

5.2 安裝步驟

Apache基金會開源的這些軟體基本上安裝都比較方便,只需要下載、解壓、配置環境變數三步即可完成,kafka也一樣,官網選擇對應版本下載後直接解壓到一個安裝目錄下就可以使用了,如果為了方便可以在~/.bashrc裡配置一下環境變數,這樣使用的時候就不需要每次都切換到安裝目錄了。

具體可參考:Kafka 叢集安裝與環境測試

5.3 測試

接下來可以通過簡單的console視窗來測試kafka是否安裝正確。

「(1)首先啟動ZooKeeper服務」

如果啟動自己安裝的ZooKeeper,使用命令zkServer.sh start即可。

如果使用kafka自帶的ZK服務,啟動命令如下(啟動之後shell不會返回,後續其他命令需要另開一個Terminal):

$cd/opt/tools/kafka#進入安裝目錄
$bin/zookeeper-server-start.shconfig/zookeeper.properties

「(2)第二步啟動kafka服務」

啟動Kafka服務的命令如下所示:

$cd/opt/tools/kafka#進入安裝目錄
$bin/kafka-server-start.shconfig/server.properties

「(3)第三步建立一個topic,假設為“test”」

建立topic的命令如下所示,其引數也都比較好理解,依次指定了依賴的ZooKeeper,副本數量,分割槽數量,topic的名字:

$cd/opt/tools/kafka#進入安裝目錄
$bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topictest1

建立完成後,可以通過如下所示的命令檢視topic列表:

$bin/kafka-topics.sh--list--zookeeperlocalhost:2181

「(4)開啟Producer和Consumer服務」

kafka提供了生產者和消費者對應的console視窗程式,可以先通過這兩個console程式來進行驗證。

首先啟動Producer:

$cd/opt/tools/kafka#進入安裝目錄
$bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest

然後啟動Consumer:

$cd/opt/tools/kafka#進入安裝目錄
$bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topictest--from-beginning

在開啟生產者服務的終端輸入一些資料,回車後,在開啟消費者服務的終端能看到生產者終端輸入的資料,即說明kafka安裝成功。

六、Apache Kafka 簡單示例

6.1 建立訊息佇列

kafka-topics.sh--create--zookeeper192.168.56.137:2181--topictest--replication-factor1--partitions1

6.2 pom.xml

<!--https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>

6.3 生產者

packagecom.njbdqn.services;

importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.ProducerConfig;
importorg.apache.kafka.clients.producer.ProducerRecord;
importorg.apache.kafka.common.serialization.StringSerializer;

importjava.util.Properties;

/**
*@author:Tokgo J
*@date:2020/9/11
*@aim:生產者:往test訊息佇列寫入訊息
*/

publicclassMyProducer{
publicstaticvoidmain(String[]args){
//定義配置資訊
Propertiesprop=newProperties();
//kafka地址,多個地址用逗號分割"192.168.23.76:9092,192.168.23.77:9092"
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.137:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
KafkaProducer<String,String>prod=newKafkaProducer<String,String>(prop);

//傳送訊息
try{
for(inti=0;i<10;i++){
//生產者記錄訊息
ProducerRecord<String,String>pr=newProducerRecord<String,String>("test","helloworld"+i);
prod.send(pr);
Thread.sleep(500);
}
}catch(InterruptedExceptione){
e.printStackTrace();
}finally{
prod.close();
}
}
}

注意:

  1. kafka如果是叢集,多個地址用逗號分割(,);
  2. Properties的put方法,第一個引數可以是字串,如:p.put("bootstrap.servers","192.168.23.76:9092");
  3. kafkaProducer.send(record)可以通過返回的Future來判斷是否已經發送到kafka,增強訊息的可靠性。同時也可以使用send的第二個引數來回調,通過回撥判斷是否傳送成功;
  4. p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);設定序列化類,可以寫類的全路徑。

6.4 消費者

packagecom.njbdqn.services;

importorg.apache.kafka.clients.consumer.ConsumerConfig;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importorg.apache.kafka.common.serialization.StringDeserializer;
importorg.apache.kafka.common.serialization.StringSerializer;

importjava.time.Duration;
importjava.util.Arrays;
importjava.util.Collections;
importjava.util.Properties;

/**
*@author:Tokgo J
*@date:2020/9/11
*@aim:消費者:讀取kafka資料
*/

publicclassMyConsumer{
publicstaticvoidmain(String[]args){
Propertiesprop=newProperties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.137:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

prop.put("session.timeout.ms","30000");
//消費者是否自動提交偏移量,預設是true避免出現重複資料設為false
prop.put("enable.auto.commit","false");
prop.put("auto.commit.interval.ms","1000");
//auto.offset.reset消費者在讀取一個沒有偏移量的分割槽或者偏移量無效的情況下的處理
//earliest在偏移量無效的情況下消費者將從起始位置讀取分割槽的記錄
//latest在偏移量無效的情況下消費者將從最新位置讀取分割槽的記錄
prop.put("auto.offset.reset","earliest");

//設定組名
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"group");

KafkaConsumer<String,String>con=newKafkaConsumer<String,String>(prop);

con.subscribe(Collections.singletonList("test"));

while(true){
ConsumerRecords<String,String>records=con.poll(Duration.ofSeconds(100));
for(ConsumerRecord<String,String>rec:records){
System.out.println(String.format("offset:%d,key:%s,value:%s",rec.offset(),rec.key(),rec.value()));

}
}
}
}

注意:

  1. 訂閱訊息可以訂閱多個主題;
  2. ConsumerConfig.GROUP_ID_CONFIG表示消費者的分組,kafka根據分組名稱判斷是不是同一組消費者,同一組消費者去消費一個主題的資料的時候,資料將在這一組消費者上面輪詢;
  3. 主題涉及到分割槽的概念,同一組消費者的個數不能大於分割槽數。因為:一個分割槽只能被同一群組的一個消費者消費。出現分割槽小於消費者個數的時候,可以動態增加分割槽;
  4. 注意和生產者的對比,Properties中的key和value是反序列化,而生產者是序列化。

七、參考

朱小廝:《深入理解Kafka:核心設計與實踐原理》

宇宙灣:《Apache Kafka 分散式訊息佇列框架》

需要上述參考資料或者是想更多kafka相關參考資料的讀者可以關注公眾號【Java鬥帝】回覆666 即可免費獲取

看完三件事❤️

========

如果你覺得這篇內容對你還蠻有幫助,我想邀請你幫我三個小忙:

點贊,轉發,有你們的 『點贊和評論』,才是我創造的動力。

關注公眾號 『 Java鬥帝 』,不定期分享原創知識。

同時可以期待後續文章ing