1. 程式人生 > >Kafka基礎-內部原理

Kafka基礎-內部原理

理解Kafka的內部原理可以有助於故障的排除,因此本文會著重介紹以下三個部分:

  • Kafka複製的工作原理
  • Kafka是怎樣處理生產者和消費者的請求
  • Kafka是怎樣儲存資料,例如檔案格式和索引

1. 叢集成員

Kafka使用Apache Zookeeper來維護當前叢集的成員列表,每個broker都有一個唯一的標識,該標識可以在broker的配置檔案設定或者自動生成。每次broker程序啟動時,它都會通過建立一個臨時節點在Zookeeper註冊自己的ID,在Zookeeper對應的路徑是/brokers/ids。

當broker與Zookeeper斷開連線時(通常是由於停止broker,但也可能是由於網路故障),broker在啟動時建立的臨時節點將會自動從Zookeeper刪除。監控叢集broker列表的Kafka元件將被通知該broker已經離線。當停止broker時,即使代表該broker的臨時節點已經被刪除,該broker的ID仍然會存在於其它資料結構中。例如,每個topic的副本列表都會包含副本的broker IDs。如果一個broker已經完全離線,然後使用相同的ID啟動一個新的broker,那麼這個broker會立即加入到離線broker原來的叢集位置,並且為其分配相同的topics和分割槽。

2. 控制器(Controller)

控制器是由其中一個Kafka broker充當,除了負責通常的broker功能外,還負責選舉分割槽leaders。叢集中第一個啟動的broker將成為控制器,它會在ZooKeeper建立路徑/controller的臨時節點。當其它brokers啟動時,它們也會嘗試建立此臨時節點,但會收到“node already exists”的異常。每個broker會建立一個監視器,用於監視控制器的狀態,以便它們收到控制器更改的通知,從而保證叢集只有一個控制器。

當控制器停止或者與Zookeeper斷開連線時,之前建立的臨時節點會被刪除,其它brokers會通過監視器知道控制器已經下線,然後會嘗試建立路徑/controller的臨時節點,第一個建立成功的broker將成為新的控制器,其餘的brokers同樣會收到“node already exists”的異常並將重新建立針對新控制器的監視器。每次新控制器的選舉,它都會通過Zookeeper的條件遞增操作接收到一個比原來更大的epoch號。如果brokers從一個具有舊的epoch號的控制器接收到訊息,它們會知道忽略它。

當控制器發現一個broker離開叢集時(通過監視相關的Zookeeper路徑),如果這個broker是一個leader,那麼控制器會遍歷這個broker對應的所有followers,從而決定新的leader(簡單地從對應分割槽的副本列表裡面選擇下一個副本),並向新的leader和那些followers傳送一個包含新leader和分割槽followers列表的資訊。新的leader會繼續處理生產者和消費者的請求,而那些followers會從新的leader複製訊息。當控制器發現一個broker加入叢集時,它會使用這個broker的ID來檢查此broker是否存在副本。如果有,控制器會通知新的和現有的brokers做相應的資訊更新,新的broker會開始從現有的leader複製訊息。

總之,Kafka使用Zookeeper的臨時節點功能來選舉控制器和在brokers加入或離開叢集時通知控制器。每當控制器發現brokers加入或離開叢集時,它就負責在分割槽和副本中選舉新的leaders。控制器使用epoch號來防止多個節點都認為自己是當前控制器,也就是所謂的“split brain”場景。

3. 複製(Replication)

複製是Kafka架構的核心,因為它是在單個節點不可避免地發生故障時保證可用性和永續性的方法。正如已經提到過的,Kafka的訊息是按topics來組織。每個topic都是分割槽的,每個分割槽可以有多個副本。副本也是儲存在brokers裡,每個broker通常可以儲存成百上千個屬於不同topics和分割槽的副本。

Kafka有兩種型別的副本:

  • Leader,每個分割槽都有一個設計為leader的副本,該副本稱為leader。為了保證一致性,所有生產者和消費者的請求都會發送給這個leader。
  • Follower,非leader分割槽的所有副本稱為followers。此類副本不負責處理客戶的請求,它們唯一的工作是從leader複製訊息,與之保持同步最新的訊息。在一個分割槽leader故障時,其中一個follower將會被選舉為新的分割槽leader。

Leader負責的另一項任務是瞭解有哪些followers與之保持同步最新的訊息。在訊息傳送到leader時,followers會通過從leader複製所有資訊來嘗試保持最新的訊息,但由於各種原因可能會失敗,例如網路阻塞導致複製減慢或者當一個broker故障導致它所有followers無法同步訊息,複製會滯後,直到這個broker恢復為止。

為了與leader保持同步,副本會向leader傳送Fetch的請求,這類請求與消費者向broker傳送的讀取訊息請求的型別是一樣的,leader會向副本傳送需要同步的訊息作為那些請求的響應。那些Fetch請求包含副本希望接下來要讀取訊息的offset,並且將始終按順序排列。

副本會先請求訊息1,然後訊息2,然後訊息3,它在讀取所有先前的訊息之前不會請求訊息4。這意味著當副本請求訊息4時,leader可以知道這個副本已經讀取訊息3及之前的所有訊息。根據每個副本請求的最後offset,leader可以知道每個副本訊息同步的落後情況。一個follower在被認為脫離同步之前可以處於非活動或落後狀態的時間由replica.lag.time.max.ms配置設定,預設為10000ms=10秒,這允許的滯後會對消費者和在選舉期間的資料儲存有影響(會在後續介紹)。如果一個副本超過10秒都沒有請求同步訊息或者它已請求同步訊息但超過10秒都沒有同步到最新的訊息,那麼這個副本將被認為“脫離同步”(out of sync)。如果一個副本不能跟上同步leader的訊息,那麼這個副本在leader故障的時候不能被選舉為新的leader,畢竟這個副本沒有包含完整的訊息,該副本會被leader移除。與此相反的是,一直請求同步最新訊息的副本被稱為“in-sync”副本,簡稱ISR。在leader故障的時候,只有該類副本才有資格被選舉為新的leader。

每個分割槽都有一個首選的leader-就是在最初建立topic時的leader。之所以是首選是因為當首次建立分割槽時,leaders在brokers之間是負載均衡的。因此,當首選leader確實是當前的leader時,負載將會均分到各個broker。預設地,Kafka設定了auto.leader.rebalance.enable=true,它會啟動一個background的執行緒檢查首選的leader是不是當前的leader,如果不是,而且首選leader處於“in-sync”狀態,則會觸發leader選舉把首選leader選舉為當前leader。

4. 處理請求

Kafka broker的工作大多數是處理從客戶端、分割槽副本和控制器傳送到分割槽leader的請求。Kafka有一個指定請求和返回格式的二進位制協議,標準的header包括:

  • 請求型別(也稱為API key)
  • 請求版本(brokers可以處理不同版本的客戶請求並作出相應的返回)
  • 相關ID:請求的唯一標識,也出現在返回和錯誤日誌中(ID用於故障排除)
  • 客戶端ID: 用於區分發送請求的應用

對於broker監聽的每個埠,broker啟動一個建立一個連線的acceptor執行緒,並將其交給processor執行緒進行處理。processor執行緒(也稱為network執行緒)數量是可配置的。processor執行緒負責接收客戶連線請求,把它們放到一個請求佇列中,和從響應佇列中讀取返回並將它們傳送回客戶端。如下圖所示:

produce請求和fetch請求都必須傳送到分割槽leader。如果一個broker接收到指定分割槽的produce/fetch請求,而那個分割槽的leader在不同的broker裡,傳送請求的客戶端將收到“Not a Leader for Partition.”(不是分割槽leader)的錯誤返回。Kafka的客戶端負責向包含請求相關分割槽的leader傳送produce/fetch請求。客戶端如何知道向哪個broker傳送請求呢?Kafka客戶端使用另一種型別稱為metadata的請求,該類請求包含客戶端感興趣的一個topic列表。伺服器返回包含topics中的分割槽資訊,每個分割槽的副本資訊和leader的資訊。Metadata請求可以傳送到任意的broker,因為所有的brokers都有包含這些資訊的metadata快取。

客戶端通常也會快取這些資訊並使用它來主導傳送produce/fetch請求到每個分割槽的相應broker。它們還需要偶爾更新這些資訊(更新間隔由metadata.max.age.ms配置設定,預設為300000ms=5分鐘),方法是傳送另外一個metadata請求,以便它們知道topic的metadata資訊是否有更新。此外,如果客戶端收到“Not a Leader for Partition.”錯誤,它會在再次傳送請求之前更新其metadata快取。

4.1 Produce請求

當一個包含分割槽leader的broker接收到一個produce請求時,它先會執行以下幾個驗證:

  • 傳送請求的使用者是否有該topic的寫許可權?
  • 請求指定的acks設定是否有效?(允許的值僅為0、1和all)
  • 如果acks設定為all,是否有足夠的“in-sync”副本用於安全地寫入訊息?(如果“in-sync”副本的數量低於設定的數量,brokers可以配置為拒絕寫入新訊息)

然後broker會把新訊息寫入到本地磁碟。在Linux系統,訊息會先被寫入到檔案系統的快取,並且無法保證何時被寫入磁碟。Kafka不會等待訊息持久化到磁碟,它依賴於複製來保持訊息的永續性。

一旦訊息被寫入到分割槽leader,這個broker就會檢查acks的配置,如果acks設定為0或1,這個broker會馬上向客戶端返回資訊;如果acks設定為all,請求會被儲存在一個被稱為purgatory的緩衝區,直到這個leader知道followers已經複製了訊息,到那個時候才會向客戶端傳送返回資訊。

4.2 Fetch請求

Brokers處理fetch請求的方式和處理produce請求的方式是非常相似的。客戶端傳送一個請求,要求broker從某個topic的分割槽傳送指定offset的訊息,例如“請將topic Test的分割槽0中的offset為53開始的訊息和分割槽3中的offset為64開始的訊息傳送給我”。客戶端還可以限制broker每個分割槽返回的資料量(max.partition.fetch.bytes=1048576=1MB),這個限制很重要,因為客戶端需要分配儲存從broker返回資訊的記憶體大小。如果沒有這個限制,brokers可以返回過大的資訊導致客戶端記憶體不足。

正如之前提到的,請求必須傳送到指定的分割槽leader,並且客戶端需要傳送必要的metadata請求以確保fetch請求能被正確地路由。當leader接收到請求時,首先會檢查請求是否有效 - 分割槽是否存在指定的offset?如果客戶端請求的訊息太舊而且已經從該分割槽刪除,或者指定的offset不存在,這個broker會返回錯誤。

如果指定的offset存在,這個broker會從分割槽相應的offset開始讀取訊息,直到大小達到客戶端在請求設定的限制,然後把訊息傳送給客戶端。Kafka使用了zero-copy的方法來發送訊息到客戶端,這意味著Kafka從檔案(Linux檔案系統快取)讀取訊息後直接傳送到網路通道,而沒有使用任何中間緩衝區。這與大多數資料庫不同,在大多數資料庫中,資料在傳送到客戶端之前是先儲存在本地快取中。這種技術消除了複製位元組和管理記憶體緩衝區的開銷,從而大大提高了效能。

除了可以設定broker返回資料量的上限之外,客戶端還可以設定返回資料量的下限(fetch.min.bytes=1)。例如,把下限設定為10KB,意思是“一旦broker有10KB資料才返回”。這是當客戶端從流量不多的topic讀取訊息時減少CPU和網路使用率的好方法。該方法是客戶端傳送一個請求,broker會先等待,直到有足夠多的資料才返回(如下圖所示),而不是客戶端每隔幾毫秒就向brokers傳送請求,然後返回非常少的資料或者沒有資料。兩種方法讀取的資料量總體上是相同的,但前者來回的次數少很多,因此開銷較少。

當然,我們不希望客戶端一直等待broker有足夠多的資料。因此,客戶端還可以定義一個超時時間(fetch.max.wait.ms=500)來告訴broker“如果在500毫秒內沒有達到最小的資料量,則只需要傳送現有的資料”。

值得注意的是,並不是所有存在於該分割槽leader的資料都可供客戶端讀取,大多數客戶端只能讀取寫入所有“in-sync”副本的訊息。上述已經介紹過分割槽leader知道哪些訊息被複制到哪些副本,在一個訊息被寫入到所有“in-sync”副本之前,該訊息將不會被髮送給消費者。嘗試讀取那些訊息會返回空而不是錯誤。這種設計的原因是,沒有被複制到足夠的副本的訊息是“不安全的”。例如,一個leader崩潰,另外一個副本被選舉為新的leader,但這個新的leader沒有完成訊息的同步,那麼訊息會丟失。因此,客戶端需要等待訊息被寫入到所有“in-sync”副本才能讀取(如下圖所示)。如果由於某些原因導致複製緩慢,消費者讀取訊息的時間會變長。允許複製延時的時間由replica.lag.time.max.ms設定,預設是10000ms=10秒,在此範圍內,副本仍會被認為是“in-sync”的。

4.3 其它請求

除了已經介紹過的Produce/Fetch/Metadata請求之外,Kafka還可以處理其它型別的請求,當前支援20種類型。例如,當控制器宣佈分割槽有一個新的leader時,它會向這個新的leader和所有followers傳送一個LeaderAndIsr請求,因此新的leader就會知道開始處理客戶端的請求,followers知道去同步新的leader。

在新版本的實現中,Kafka不再使用Zookeeper來儲存offsets資訊,而是使用一個特別的topic。因此,Kafka新增了OffsetCommitRequest、OffsetFetchRequest和ListOffsetsRequest請求。當呼叫客戶端commitOffset()的API時,不再把offsets寫入到Zookeeper,而是向Kafka傳送OffsetCommitRequest請求把offsets寫入到一個特別的topic。

Topic的建立現在還是由命令列工具完成,命令列工具直接更新Zookeeper中的topics列表,brokers會監控這個列表得知新的topics被建立。在新版本的實現中,會新增CreateTopicRequest允許所有客戶端直接向brokers傳送請求建立topics。

除了新增支援新的請求型別,有時還會修改現有的請求新增一些功能。例如,在Kafka 0.9.0和0.10.0版本期間,決定通過增加資訊到Metadata的返回讓客戶端知道當前控制器的資訊。在釋出的0.10.0版本中,新增ApiVersionRequest請求,允許客戶端詢問broker支援每個請求的版本並相應地使用正確的版本。

5. 物理儲存

Kafka的基本儲存單元是分割槽副本。分割槽不能在多個brokers間分割,甚至不能在同一個broker的多個磁碟間分割。因此,分割槽的大小是由單個掛載點(如果使用JBOD配置,則是單個磁碟,如果使用RAID配置,則是多個磁碟)上的可用空間限制。當配置Kafka時,管理員需要定義儲存分割槽的目錄列表,使用的配置是log.dirs,如果沒有設定會使用log.dir,預設路徑是/tmp/kafka-logs。

5.1 分配分割槽

當建立一個topic時,Kafka首先會決定怎樣在brokers之間分配分割槽。假設有6個brokers,並且決定建立一個包含10個分割槽且複製因子為3的topic。所以現在要把30個分割槽副本分配給6個brokers。當進行分配時,我們的目標是:

  • 平均分配,本例就是每個broker分配5個副本。
  • 確保每個分割槽的每個副本都在不同的broker。如果分割槽0的leader被分配到broker2,那麼可以把2個followers分配到broker 3和4,但不能再分配到broker2,也不能都分配到broker3。
  • 如果brokers有機架資訊(在Kafka0.10.0及更高版本可用),儘可能把每個分割槽的副本分配到不同的機架上。這樣可以確保即使整個機架都故障的情況下都不會導致分割槽完全不可用。

為了達到此目標,從隨機一個broker(例如是broker4)開始,並以迴圈的方式把分割槽分配給每個broker,以確定leaders的位置。因此分割槽0的leader將分配到broker4,分割槽1的leader將分配到broker5,分割槽2的leader將分配到broker0,依此類推。然後,對每個分割槽,依次把副本分配到leader所在的broker的下一個。例如,分割槽0的leader在broker4,它的第一個follower將在broker5,第二個follower在broker0。分割槽1的leader在broker5,它的第一個follower將在broker0,第二個follower在broker1,依此類推。

當考慮機架感知時,不是按數字順序選擇brokers,而是準備一個機架交替的broker列表。假設broker0,1和2在同一個機架上,而broker3,4和5在另外一個機架上。不按0到5的順序選擇brokers,而是按0,3,1,4,2,5交替的順序,每個broker後面是另一個機架的broker。在這種情況下,如果分割槽0的leader在broker4,它的第一個follower將在不同機架的broker2。如果第一個機架離線,還有副本在另外一個機架,因此,分割槽仍然可用。這適用於所有分割槽副本,因此可以保證在機架故障情況下的分割槽可用性。

一旦為每個分割槽和副本選擇了正確的brokers,就可以決定將哪個目錄用於新的分割槽。使用的規則非常簡單:計算每個目錄上的分割槽數量,把新的分割槽儲存到具有最少分割槽的目錄中。這意味著如果增加新的磁碟,所有新的分割槽都會儲存在新的磁碟。這是因為,在目錄分割槽數均衡前,新的磁碟始終具有最少的分割槽。

需要注意的是,分配分割槽的規則只會考慮目錄的分割槽數量,而不會考慮磁碟的可用空間或現有負載,也不會考慮分割槽的大小。這意味著如果某些brokers具有比其它brokers更多的磁碟空間(可能是因為群集是舊伺服器和新伺服器的混合),某些分割槽非常大,或者在同一個broker有不同大小的磁碟,則在分配分割槽的時候需要非常小心。

5.2 檔案管理

Retention是Kafka的一個重要概念,Kafka不會永久儲存資料,也不會等待所有消費者讀取完訊息後才刪除資料。Kafka管理員需要為每個topic配置資料的保留期,可以是按時間(log.retention.hours=168=7天),也可以是按大小(log.retention.bytes=-1)。

因為在一個大檔案裡面查詢和刪除訊息是既耗時又容易出錯,所以會把每個分割槽分割成segments。預設地,每個segment包含1GB(log.segment.bytes)的資料或1周(log.roll.hours=168=7天)的資料,以較小者為準。當broker正在寫資料到分割槽時,如果達到segment的上限,就會新建立一個segment。

正在寫入資料的segment稱為活動的segment,它永遠不會被刪除。因此如果設定retention為只保留1天資料但每個segment按5天資料分割,那麼資料會被保留5天,因為segment在關閉前是無法被刪除的。如果配置保留1週數據和每個segment按1天資料分割,那麼每天刪除最舊的segment的同時會建立一個新的segment,因為大部分時間分割槽會有7個segments。

5.3 檔案格式

每個segment都是儲存在單個數據檔案中,每個檔案儲存了訊息及其offsets。磁碟上的資料格式和從生產者傳送到broker以及稍後從broker傳送給消費者的格式是一樣的。使用相同的訊息格式允許Kafka在向消費者傳送訊息時使用zero-copy優化,而且還避免瞭解壓縮和重新壓縮訊息。

每個訊息除了包含key、value和offset,還包含其它資訊,例如是訊息的大小,用於檢測損壞的checksum code,表示訊息格式版本的magic byte,壓縮編解碼器(Snappy,GZip或LZ4),和timestamp(在0.10.0版本新增)。這個timestamp由生產者在傳送訊息或者broker在接收到訊息時提供,具體取決於配置。

如果生產者傳送的訊息是壓縮的,則在同一批處理的所有訊息都會壓縮在一起並作為一個“wrapper message”的值來發送(如下圖所示)。因此,broker只會收到單個訊息,消費者也相應會收到單個訊息。但當消費者解壓這個訊息時會得到同一批的所有訊息,每個訊息都有各自的timestamps和offsets。

Kafka brokers內建DumpLogSegment工具,允許檢視檔案系統的分割槽segment和檢查其內容-每個訊息的offset,checksum,magic byte,大小和壓縮編解碼器。命令如下:

bin/kafka-run-class.sh kafka.tools.DumpLogSegments

如果指定引數--deep-iteration,可以顯示上述提到的壓縮在訊息裡面的“wrapper message”。

5.4 索引

Kafka允許消費者從任何可用的offset開始讀取訊息。這意味著如果消費者需要從offset 100開始讀取1MB訊息,broker必須能夠快速地定位offset為100的訊息(可以在分割槽的任何segments中)並開始讀取訊息。為了幫助brokers快速定位指定offset的訊息,Kafka維護每個分割槽的索引。這個索引儲存了offsets和segment檔案、位置的對映關係。

索引也分多個segments來儲存,因此在刪除訊息時可以同時刪除舊的索引條目。如果索引損壞,只需要重新讀取訊息,記錄offsets和位置,就可以從匹配的日誌segment重新生成。如果需要,管理員刪除索引segments也是完全安全的,索引會被自動重新建立。

5.5 Compaction(這裡我翻譯為精煉)介紹

根據上述介紹,Kafka會根據設定的保留期來儲存資料。但是,如果要儲存類似客戶的送貨地址,儲存最新的地址會更有意義,這樣,你不必擔心存在舊的地址。另一個用例是儲存狀態資訊,每次狀態改變時,應用程式會將新狀態寫入到Kafka。我們只需要關係最新的狀態,而不是之前發生的所有狀態。

Kafka支援此類用例,如果retention策略(log.cleanup.policy)設定為delete,會刪除超過保留時間的訊息;如果設定為compact,對具有相同key的訊息,只保留最新的值。顯然,把策略設定為compact只對包含key和value的訊息有作用,對key為null的訊息不起作用。

5.6 Compaction工作原理

Kafka的訊息可以分為2部分(如下圖所示):

  • 之前已精煉的訊息,此部分的每個key都是唯一的,對應的值是上一次精煉時的最新值。可以稱為“Clean”部分。
  • 在上一次精煉後寫入的訊息。可以稱為“Dirty”部分。

當Kafka啟動時,如果啟用了compaction(log.cleaner.enable,預設為true),每個broker將啟動一個compaction manager執行緒和多個cleaner執行緒,它們用於執行compaction的任務。每個執行緒會選擇一個有較多dirty訊息的分割槽,然後對其執行compaction任務。為了防止compaction過於頻繁,Kafka使用了配置log.cleaner.min.cleanable.ratio(預設值為0.5)來限定可進行compaction的最小dirty訊息比率。

要精煉一個分割槽,需要遍歷兩次日誌檔案,第一次遍歷cleaner執行緒讀取分割槽的dirty部分並建立一個記憶體對映。每個對映項由一個hash後的16位元組key和8位元組offset組成,這意味著每個對映項僅使用24位元組。如果要檢視一個1GB的segment並假設每個訊息大小是1KB,那麼該segment將包含1百萬個訊息,需要24MB記憶體來建立對映關係。如果有相同的key,會使用更少記憶體,因為可以重用相同的hash。這可以說是十分有效。

一旦cleaner執行緒建立offset的對映,會進行第二次遍歷,它將從最舊的clean segments開始讀取訊息,如果訊息的key已經存在於offset對映中,會保留較新的值(如下圖所示)。較新的值會被複制到一個以“.clean”為字尾的臨時檔案中,處理完後會把字尾改為“.swap”,然後刪除原來的日誌檔案,最後把字尾“.swap”去掉。

當配置Kafka時,管理員需要配置用於儲存這個對映的可使用記憶體大小(log.cleaner.dedupe.buffer.size,預設是134217728=128MB)和cleaner執行緒數(log.cleaner.threads,預設是1)。每個執行緒都會建立一個對映,但這個配置是針對所有執行緒的總記憶體大小。如果配置了1GB並且有5個cleaner執行緒,那麼每個執行緒會使用200MB記憶體。Kafka不要求分割槽的整個dirty部分都能夠儲存在分配的記憶體對映裡面,但至少一個segment能夠被儲存在此記憶體對映裡。如果不能,Kafka會記錄一個錯誤日誌,管理員需要分配更多的記憶體或者使用更少的cleaner執行緒。Kafka會先精煉能夠儲存在記憶體對映的最舊的segments,剩餘的segments會等待下次的compaction。

5.7 刪除事件

如果需要刪除指定key的訊息時,可以生成包含該key但把值設為null的訊息,我們稱此類訊息為墓碑訊息(tombstone)。當cleaner執行緒找到這樣的訊息時,它將首先進行正常的compaction並保留墓碑訊息一段時間(log.cleaner.delete.retention.ms=86400000=24小時)。在此期間,消費者可以讀取該訊息並知道它已被刪除。在此配置時間之後,cleaner執行緒將刪除這些墓碑訊息。

END O(∩_∩)O