apache kafka系列之原始碼分析走讀-kafka內部模組分析
apache kafka中國社群QQ群:162272557
kafka整體結構分析:
kafka原始碼工程目錄結構如下圖:
下面只對core目錄結構作說明,其他都是測試類或java客戶端程式碼
admin --管理員模組,操作和管理topic,paritions相關,包含create,delete topic,擴充套件 patitions
Api --該模組主要負責互動資料的組裝,客戶端與服務端互動資料編解碼
client --該模組比較簡單,就一個類,Producer讀取kafka broker元資料資訊,
topic和partitions,以及leader
cluster --該模組包含幾個實體類,Broker,Cluster,Partition,Replica,解釋他們之間關係: Cluster由多個broker組成,一個Broker包含多個partition,一個topic的所有
partitions分佈在不同broker的中,一個Replica包含多個Partition。
common --通用模組,只包含異常類和錯誤驗證
consumer --consumer處理模組,負責所有客戶端消費者資料和邏輯處理
contoroller --負責中央控制器選舉,partition的leader選舉,副本分配,副本重新分配,
partition和replica擴容。
javaapi --提供java的producer和consumer介面api
log --Kafka檔案儲存模組,負責讀寫所有kafka的topic訊息資料。
message --封裝多個訊息組成一個“訊息集”或壓縮訊息集。
metrics --內部狀態的監控模組
network --網路事件處理模組,負責處理和接收客戶端連線
producer --producer實現模組,包括同步和非同步傳送訊息。
serializer --序列化或反序列化當前訊息
kafka --kafka門面入口類,副本管理,topic配置管理,leader選舉實現(由contoroller模組呼叫
tools --一看這就是工具模組,包含內容比較多:
a.匯出對應consumer的offset值.
b.匯出LogSegments資訊,當前topic的log寫的位置資訊.
c.匯出zk上所有consumer的offset值.
d.修改註冊在zk的consumer的offset值.
f.producer和consumer的使用例子.
utils --Json工具類,Zkutils工具類,Utils建立執行緒工具類,KafkaScheduler公共排程器類,公共日誌類等等。
1.kafka啟動類:kafka.scala
kafka為kafka broker的main啟動類,其主要作用為載入配置,啟動report服務(內部狀態的監控),註冊釋放資源的鉤子,以及門面入口類。
kafka類程式碼如下:
......
try {
val props = Utils.loadProps(args(0)) //載入配置檔案
val serverConfig = new KafkaConfig(props)
KafkaMetricsReporter.startReporters(serverConfig.props) //啟動report服務(內部狀態的監控)
val kafkaServerStartble = new KafkaServerStartable(serverConfig) //kafka server核心入口類
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
//鉤子程式,當jvm退出前,銷燬所有資源
override def run() = {
kafkaServerStartble.shutdown
}
})
kafkaServerStartble.startup
kafkaServerStartble.awaitShutdown
}
......
KafkaServerStartble類包裝了KafkaSever類,其實啥都沒有做。只是呼叫包裝類而已
KafkaSever類是kafka broker執行控制的核心入口類,它是採用門面模式設計的。
kafka中KafkaServer類,採用門面模式,是網路處理,io處理等得入口.
ReplicaManager
副本管理
KafkaApis 處理所有request的Proxy類,根據requestKey決定調⽤用具體的handler
KafkaRequestHandlerPool 處理request的執行緒池,請求處理池 <-- num.io.threads io執行緒數量
LogManager kafka檔案儲存系統管理,負責處理和儲存所有Kafka的topic的partiton資料
TopicConfigManager
監聽此zk節點的⼦子節點/config/changes/,通過LogManager更新topic的配置資訊,topic粒度配置管理,具體請檢視topic級別配置
KafkaHealthcheck 監聽zk session expire,在zk上建立broker資訊,便於其他broker和consumer獲取其資訊
KafkaController
kafka叢集中央控制器選舉,leader選舉,副本分配。
KafkaScheduler
負責副本管理和日誌管理排程等等
ZkClient 負責註冊zk相關資訊.
BrokerTopicStats
topic資訊統計和監控
ControllerStats 中央控制器統計和監控
KafkaServer部分主要程式碼如下:
......
def startup() {
info("starting")
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
/* start scheduler */
kafkaScheduler.startup()
/* setup zookeeper */
zkClient = initZk()
/* start log manager */
logManager = createLogManager(zkClient)
logManager.startup()
socketServer = new SocketServer(config.brokerId,
config.hostName,
config.port,
config.numNetworkThreads,
config.queuedMaxRequests,
config.socketSendBufferBytes,
config.socketReceiveBufferBytes,
config.socketRequestMaxBytes)
socketServer.startup()
replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
kafkaController = new KafkaController(config, zkClient)
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
Mx4jLoader.maybeLoad()
replicaManager.startup()
kafkaController.startup()
topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()
/* tell everyone we are alive */
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()
registerStats()
startupComplete.set(true);
info("started")
}
private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}
/**
* Forces some dynamic jmx beans to be registered on server startup.
*/
private def registerStats() {
BrokerTopicStats.getBrokerAllTopicsStats()
ControllerStats.uncleanLeaderElectionRate
ControllerStats.leaderElectionTimer
}
.......