18道kafka高頻面試題哪些你還不會?(含答案和思維導圖)
阿新 • • 發佈:2019-12-23
前言
Kafka是最初由Linkedin公司開發,是一個分散式、支援分割槽的(partition)、多副本的(replica),基於zookeeper協調的分散式訊息系統,它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,訊息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源 專案。
關於Kafka的知識總結了個思維導圖
kafka 面試題
1、如何獲取 topic 主題的列表
2、生產者和消費者的命令列是什麼?
3、consumer 是推還是拉?
4、講講 kafka 維護消費狀態跟蹤的方法
5、講一下主從同步
6、為什麼需要訊息系統,mysql 不能滿足需求嗎?
7、Zookeeper 對於 Kafka 的作用是什麼?
8、資料傳輸的事務定義有哪三種?
9、Kafka 判斷一個節點是否還活著有那兩個條件?
10、Kafka 與傳統 MQ 訊息系統之間有三個關鍵區別
11、講一講 kafka 的 ack 的三種機制
13、消費者故障,出現活鎖問題如何解決?
14、如何控制消費的位置
15、kafka 分散式(不是單機)的情況下,如何保證訊息的順序消費?
16、kafka 的高可用機制是什麼?
17、kafka 如何減少資料丟失
18、kafka 如何不消費重複資料?比如扣款,我們不能重複的扣。
1、如何獲取 topic 主題的列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
2、生產者和消費者的命令列是什麼?
生產者在主題上釋出訊息:
bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 --topicHello-Kafka
注意這裡的 IP 是 server.properties 中的 listeners 的配置。接下來每個新行就是輸入一條新訊息。
消費者接受訊息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topicHello-Kafka --from-beginning
3、consumer 是推還是拉?
Kafka 最初考慮的問題是,customer 應該從 brokes 拉取訊息還是 brokers 將訊息推送到 consumer,也就是 pull 還 push。在這方面,Kafka 遵循了一種大部分訊息系統共同的傳統的設計:producer 將訊息推送到 broker,consumer 從broker 拉取訊息。
一些訊息系統比如 Scribe 和 Apache Flume 採用了 push 模式,將訊息推送到下游的 consumer。這樣做有好處也有壞處:由 broker 決定訊息推送的速率,對於不同消費速率的 consumer 就不太好處理了。訊息系統都致力於讓 consumer 以最大的速率最快速的消費訊息,但不幸的是,push 模式下,當 broker 推送的速率遠大於 consumer 消費的速率時,consumer 恐怕就要崩潰了。最終 Kafka 還是選取了傳統的 pull 模式。
Pull 模式的另外一個好處是 consumer 可以自主決定是否批量的從 broker 拉取資料 。Push 模式必須在不知道下游 consumer 消費能力和消費策略的情況下決定是立即推送每條訊息還是快取之後批量推送。如果為了避免 consumer 崩潰而採用較低的推送速率,將可能導致一次只推送較少的訊息而造成浪費。Pull 模式下,consumer 就可以根據自己的消費能力去決定這些策略。
Pull 有個缺點是,如果 broker 沒有可供消費的訊息,將導致 consumer 不斷在迴圈中輪詢,直到新訊息到 t 達。為了避免這點,Kafka 有個引數可以讓 consumer阻塞知道新訊息到達(當然也可以阻塞知道訊息的數量達到某個特定的量這樣就可以批量傳送)。
4、講講 kafka 維護消費狀態跟蹤的方法
大部分訊息系統在 broker 端的維護訊息被消費的記錄:一個訊息被分發到consumer 後 broker 就馬上進行標記或者等待 customer 的通知後進行標記。這樣也可以在訊息在消費後立馬就刪除以減少空間佔用。
但是這樣會不會有什麼問題呢?如果一條訊息傳送出去之後就立即被標記為消費過的,旦 consumer 處理訊息時失敗了(比如程式崩潰)訊息就丟失了。為了解決這個問題,很多訊息系統提供了另外一個個功能:當訊息被髮送出去之後僅僅被標記為已傳送狀態,當接到 consumer 已經消費成功的通知後才標記為已被消費的狀態。這雖然解決了訊息丟失的問題,但產生了新問題,首先如果 consumer處理訊息成功了但是向 broker 傳送響應時失敗了,這條訊息將被消費兩次。第二個問題時,broker 必須維護每條訊息的狀態,並且每次都要先鎖住訊息然後更改狀態然後釋放鎖。這樣麻煩又來了,且不說要維護大量的狀態資料,比如如果訊息傳送出去但沒有收到消費成功的通知,這條訊息將一直處於被鎖定的狀態,Kafka 採用了不同的策略。Topic 被分成了若干分割槽,每個分割槽在同一時間只被一個 consumer 消費。這意味著每個分割槽被消費的訊息在日誌中的位置僅僅是一個簡單的整數:offset。這樣就很容易標記每個分割槽消費狀態就很容易了,僅僅需要一個整數而已。這樣消費狀態的跟蹤就很簡單了。
這帶來了另外一個好處:consumer 可以把 offset 調成一個較老的值,去重新消費老的訊息。這對傳統的訊息系統來說看起來有些不可思議,但確實是非常有用的,誰規定了一條訊息只能被消費一次呢?
5、講一下主從同步
Kafka允許topic的分割槽擁有若干副本,這個數量是可以配置的,你可以為每個topci配置副本的數量。Kafka會自動在每個個副本上備份資料,所以當一個節點down掉時資料依然是可用的。
Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當於只有一份資料。
6、為什麼需要訊息系統,mysql 不能滿足需求嗎?
(1)解耦:
允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。
(2)冗餘:
訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險。許多訊息佇列所採用的”插入-獲取-刪除”正規化中,在把一個訊息從佇列中刪除之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的儲存直到你使用完畢。
(3)擴充套件性:
因為訊息佇列解耦了你的處理過程,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
(4)靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
(5)可恢復性:
系統的一部分元件失效時,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。
(6)順序保證:
在大多使用場景下,資料處理的順序都很重要。大部分訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。(Kafka 保證一個 Partition 內的訊息的有序性)
(7)緩衝:
有助於控制和優化資料流經過系統的速度,解決生產訊息和消費訊息的處理速度不一致的情況。
(8)非同步通訊:
很多時候,使用者不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們。
7、Zookeeper 對於 Kafka 的作用是什麼?
Zookeeper 是一個開放原始碼的、高效能的協調服務,它用於 Kafka 的分散式應用。
Zookeeper 主要用於在叢集中不同節點之間進行通訊
在 Kafka 中,它被用於提交偏移量,因此如果節點在任何情況下都失敗了,它都可以從之前提交的偏移量中獲取除此之外,它還執行其他活動,如: leader 檢測、分散式同步、配置管理、識別新節點何時離開或連線、叢集、節點實時狀態等等。
8、資料傳輸的事務定義有哪三種?
和 MQTT 的事務定義一樣都是 3 種。
(1)最多一次: 訊息不會被重複傳送,最多被傳輸一次,但也有可能一次不傳輸
(2)最少一次: 訊息不會被漏傳送,最少被傳輸一次,但也有可能被重複傳輸.
(3)精確的一次(Exactly once): 不會漏傳輸也不會重複傳輸,每個訊息都傳輸被一次而且僅僅被傳輸一次,這是大家所期望的
9、Kafka 判斷一個節點是否還活著有那兩個條件?
(1)節點必須可以維護和 ZooKeeper 的連線,Zookeeper 通過心跳機制檢查每個節點的連線
(2)如果節點是個 follower,他必須能及時的同步 leader 的寫操作,延時不能太久
10、Kafka 與傳統 MQ 訊息系統之間有三個關鍵區別
(1).Kafka 持久化日誌,這些日誌可以被重複讀取和無限期保留
(2).Kafka 是一個分散式系統:它以叢集的方式執行,可以靈活伸縮,在內部通過複製資料提升容錯能力和高可用性
(3).Kafka 支援實時的流式處理
11、講一講 kafka 的 ack 的三種機制
request.required.acks 有三個值 0 1 -1(all)
0:生產者不會等待 broker 的 ack,這個延遲最低但是儲存的保證最弱當 server 掛掉的時候就會丟資料。
1:服務端會等待 ack 值 leader 副本確認接收到訊息後傳送 ack 但是如果 leader掛掉後他不確保是否複製完成新 leader 也會導致資料丟失。
-1(all):服務端會等所有的 follower 的副本受到資料後才會受到 leader 發出的ack,這樣資料不會丟失
12、消費者如何不自動提交偏移量,由應用提交?
將 auto.commit.offset 設為 false,然後在處理一批訊息後 commitSync() 或者非同步提交 commitAsync()
即:
ConsumerRecords<> records = consumer.poll(); for (ConsumerRecord<> record : records){ 。。。 tyr{ consumer.commitSync() } 。。。 }
13、消費者故障,出現活鎖問題如何解決? 出現“活鎖”的情況,是它持續的傳送心跳,但是沒有處理。為了預防消費者在這種情況下一直持有分割槽,我們使用 max.poll.interval.ms 活躍檢測機制。 在此基礎上,如果你呼叫的 poll 的頻率大於最大間隔,則客戶端將主動地離開組,以便其他消費者接管該分割槽。 發生這種情況時,你會看到 offset 提交失敗(呼叫commitSync()引發的 CommitFailedException)。這是一種安全機制,保障只有活動成員能夠提交 offset。所以要留在組中,你必須持續呼叫 poll。 消費者提供兩個配置設定來控制 poll 迴圈: max.poll.interval.ms:增大 poll 的間隔,可以為消費者提供更多的時間去處理返回的訊息(呼叫 poll(long)返回的訊息,通常返回的訊息都是一批)。缺點是此值越大將會延遲組重新平衡。 max.poll.records:此設定限制每次呼叫 poll 返回的訊息數,這樣可以更容易的預測每次 poll 間隔要處理的最大值。通過調整此值,可以減少 poll 間隔,減少重新平衡分組的 對於訊息處理時間不可預測地的情況,這些選項是不夠的。 處理這種情況的推薦方法是將訊息處理移到另一個執行緒中,讓消費者繼續呼叫 poll。 但是必須注意確保已提交的 offset 不超過實際位置。另外,你必須禁用自動提交,並只有在執行緒完成處理後才為記錄手動提交偏移量(取決於你)。 還要注意,你需要 pause 暫停分割槽,不會從 poll 接收到新訊息,讓執行緒處理完之前返回的訊息(如果你的處理能力比拉取訊息的慢,那建立新執行緒將導致你機器記憶體溢位)。 14、如何控制消費的位置 kafka 使用 seek(TopicPartition, long)指定新的消費位置。用於查詢伺服器保留的最早和最新的 offset 的特殊的方法也可用(seekToBeginning(Collection) 和seekToEnd(Collection)) 15、kafka 分散式(不是單機)的情況下,如何保證訊息的順序消費? Kafka 分散式的單位是 partition,同一個 partition 用一個 write ahead log 組織,所以可以保證 FIFO 的順序。不同 partition 之間不能保證順序。但是絕大多數使用者都可以通過 message key 來定義,因為同一個 key 的 message 可以保證只發送到同一個 partition。 Kafka 中傳送 1 條訊息的時候,可以指定(topic, partition, key) 3 個引數。partiton 和 key 是可選的。如果你指定了 partition,那就是所有訊息發往同 1個 partition,就是有序的。並且在消費端,Kafka 保證,1 個 partition 只能被1 個 consumer 消費。或者你指定 key( 比如 order id),具有同 1 個 key 的所有訊息,會發往同 1 個 partition。 16、kafka 的高可用機制是什麼? 這個問題比較系統,回答出 kafka 的系統特點,leader 和 follower 的關係,訊息讀寫的順序即可。 17、kafka 如何減少資料丟失 Kafka到底會不會丟資料(data loss)? 通常不會,但有些情況下的確有可能會發生。下面的引數配置及Best practice列表可以較好地保證資料的永續性(當然是trade-off,犧牲了吞吐量)。
- block.on.buffer.full = true
- acks = all
- retries = MAX_VALUE
- max.in.flight.requests.per.connection = 1
- 使用KafkaProducer.send(record, callback)
- callback邏輯中顯式關閉producer:close(0)
- unclean.leader.election.enable=false
- replication.factor = 3
- min.insync.replicas = 2
- replication.factor > min.insync.replicas
- enable.auto.commit=false
- 訊息處理完成之後再提交位移