1. 程式人生 > >Kafka 原始碼剖析

Kafka 原始碼剖析

1.概述

  在對Kafka使用層面掌握後,進一步提升分析其原始碼是極有必要的。縱觀Kafka原始碼工程結構,不算太複雜,程式碼量也不算大。分析研究其實現細節難度不算太大。今天筆者給大家分析的是其核心處理模組,core模組。

2.內容

  首先,我們需要對Kafka的工程結構有一個整體的認知度,Kafka 大家最為熟悉的就是其消費者與生產者。然其,底層的儲存機制,選舉機制,備份機制等實現細節,需要我們對其原始碼仔細閱讀學習,思考與分析其設計之初的初衷。下面,我們首先來看看Kafka原始碼工程模組分佈,截止當天日期,官方託管在 Github 上的 Kafka 原始碼版本為:0.10.2.1,其工程分佈結構如下圖所示:

  這裡筆記只針對core模組進行說明,其他模組均是啟動指令碼,文件說明,測試類或是Java客戶端的相關程式碼,本篇部落格就不多做贅述了。

模組名 說明
admin kafka的管理員模組,操作和管理其topic,partition相關,包含建立,刪除topic,或者拓展分割槽等。
api 主要負責資料互動,客戶端與服務端互動資料的編碼與解碼。
client 該模組下就一個類,producer讀取kafka broker元資料資訊,topic和分割槽,以及leader。
cluster 這裡包含多個實體類,有Broker,Cluster,Partition,Replica。其中一個Cluster由多個Broker組成,一個Broker包含多個Partition,一個Topic的所有Partition分佈在不同的Broker中,一個Replica包含都個Partition。
common 這是一個通用模組,其只包含各種異常類以及錯誤驗證。
consumer 消費者處理模組,負責所有的客戶端消費者資料和邏輯處理。
controller 此模組負責中央控制器的選舉,分割槽的Leader選舉,Replica的分配或其重新分配,分割槽和副本的擴容等。
coordinator 負責管理部分consumer group和他們的offset。
javaapi 提供Java語言的producer和consumer的API介面。
log 這是一個負責Kafka檔案儲存模組,負責讀寫所有的Kafka的Topic訊息資料。
message 封裝多條資料組成一個數據集或者壓縮資料集。
metrics 負責內部狀態的監控模組。
network 該模組負責處理和接收客戶端連線,處理網路時間模組。
producer 生產者的細節實現模組,包括的內容有同步和非同步的訊息傳送。
security 負責Kafka的安全驗證和管理模組。
serializer 序列化和反序列化當前訊息內容。
server 該模組涉及的內容較多,有Leader和Offset的checkpoint,動態配置,延時建立和刪除Topic,Leader的選舉,Admin和Replica的管理,以及各種元資料的快取等內容。
tools 閱讀該模組,就是一個工具模組,涉及的內容也比較多。有匯出對應consumer的offset值;匯出LogSegments資訊,以及當前Topic的log寫的Location資訊;匯出Zookeeper上的offset值等內容。
utils 各種工具類,比如Json,ZkUtils,執行緒池工具類,KafkaScheduler公共排程器類,Mx4jLoader監控載入器,ReplicationUtils複製集工具類,CommandLineUtils命令列工具類,以及公共日誌類等內容。

3.原始碼環境

  閱讀Kafka原始碼需要準備以下環境:

  • JDK
  • IDE(Eclipse,IDEA或者其他)
  • gradle

  關於環境的搭建,大家可以利用搜索引擎去完成,比較基礎,這裡就不多贅述了。然後在原始碼工程目錄下執行以下命令:

  • gradle idea(編輯器為IDEA)
  • gradle eclipse(編輯器為Eclipse)

  如何選擇,可按照自己所使用的編輯器即可。這裡筆者所使用的是IDEA,執行命令後,會在原始碼目錄生成以下檔案,如下圖所示:

  然後,在編輯器中匯入該原始碼專案工程即可,如下圖所示:

4.執行原始碼

  這裡,我們先在config模組下設定server.properties檔案,按照自己的需要設定,比如分割槽數,log的儲存路徑,zookeeper的地址設定等等。然後,我們在編輯器中的執行中設定相關的啟動引數,如下圖所示:

  啟動類Kafka.scala在core模組下,需要注意的是,這裡在啟動Kafka之前,確保我們之前在server.properties檔案中所配置的Zookeeper叢集已正常執行,然後我們在編輯器中執行Kafka原始碼,如下圖所示:

5.預覽結果

  這裡,我們做一下簡單的修改,在啟動類的開頭列印一句啟動日誌和啟動時間,部分執行日誌和執行結果截圖如下所示:

Start Kafka,DateTime[1494065094606]
[2017-05-06 18:04:54,830] INFO KafkaConfig values: 
    advertised.host.name = null
    advertised.listeners = null
    advertised.port = null
    authorizer.class.name = 
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true
    background.threads = 10
    broker.id = 0
    broker.id.generation.enable = true
    broker.rack = null
    compression.type = producer
    connections.max.idle.ms = 600000
    controlled.shutdown.enable = true

  如上圖,紅色框即是我們簡單的新增的一句程式碼。

6.總結

  本篇部落格給大家介紹了Kafka原始碼的core模組下各個子模組所負責的內容,以及如何便捷的去閱讀原始碼,以及在編輯器中執行Kafka原始碼。後續,再為大家分析Kafka的儲存機制,選舉機制,備份機制等內容的實現細節。最後,歡迎大家使用Kafka-Eagle監控工具。

7.結束語

  這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或傳送郵件給我,我會盡我所能為您解答,與君共勉!