1. 程式人生 > >apache kafka系列之原始碼分析走讀-kafka內部模組分析

apache kafka系列之原始碼分析走讀-kafka內部模組分析

apache kafka中國社群QQ群:162272557

kafka整體結構分析:

kafka原始碼工程目錄結構如下圖:


下面只對core目錄結構作說明,其他都是測試類或java客戶端程式碼

admin   --管理員模組,操作和管理topic,paritions相關,包含create,delete topic,擴充套件 patitions

Api       --該模組主要負責互動資料的組裝,客戶端與服務端互動資料編解碼

client   --該模組比較簡單,就一個類,Producer讀取kafka broker元資料資訊,

topic和partitions,以及leader

cluster   --該模組包含幾個實體類,Broker,Cluster,Partition,Replica,解釋他們之間關係: Cluster由多個broker組成,一個Broker包含多個partition,一個topic的所有

partitions分佈在不同broker的中,一個Replica包含多個Partition。

common    --通用模組,只包含異常類和錯誤驗證

consumer    --consumer處理模組,負責所有客戶端消費者資料和邏輯處理

contoroller  --負責中央控制器選舉,partition的leader選舉,副本分配,副本重新分配,

partition和replica擴容。

javaapi  --提供java的producer和consumer介面api

log          --Kafka檔案儲存模組,負責讀寫所有kafka的topic訊息資料。

message    --封裝多個訊息組成一個“訊息集”或壓縮訊息集。

metrics    --內部狀態的監控模組

network        --網路事件處理模組,負責處理和接收客戶端連線

producer        --producer實現模組,包括同步和非同步傳送訊息。

serializer        --序列化或反序列化當前訊息

kafka         --kafka門面入口類,副本管理,topic配置管理,leader選舉實現(由contoroller模組呼叫

)。

tools           --一看這就是工具模組,包含內容比較多:

a.匯出對應consumer的offset值.

b.匯出LogSegments資訊,當前topic的log寫的位置資訊.

c.匯出zk上所有consumer的offset值.

d.修改註冊在zk的consumer的offset值.

f.producer和consumer的使用例子.

utils  --Json工具類,Zkutils工具類,Utils建立執行緒工具類,KafkaScheduler公共排程器類,公共日誌類等等。


1.kafka啟動類:kafka.scala

kafka為kafka broker的main啟動類,其主要作用為載入配置,啟動report服務(內部狀態的監控),註冊釋放資源的鉤子,以及門面入口類。

kafka類程式碼如下:

......

 try {
      val props = Utils.loadProps(args(0))          //載入配置檔案

      val serverConfig = new KafkaConfig(props)
      KafkaMetricsReporter.startReporters(serverConfig.props)             //啟動report服務(內部狀態的監控)
      val kafkaServerStartble = new KafkaServerStartable(serverConfig)    //kafka server核心入口類
      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {                 //鉤子程式,當jvm退出前,銷燬所有資源
        override def run() = {
          kafkaServerStartble.shutdown
        }
      })


      kafkaServerStartble.startup
      kafkaServerStartble.awaitShutdown
    }

......

KafkaServerStartble類包裝了KafkaSever類,其實啥都沒有做。只是呼叫包裝類而已

KafkaSever類是kafka broker執行控制的核心入口類,它是採用門面模式設計的。



kafka中KafkaServer類,採用門面模式,是網路處理,io處理等得入口.

ReplicaManager   副本管理

KafkaApis   處理所有request的Proxy類,根據requestKey決定調⽤用具體的handler

KafkaRequestHandlerPool 處理request的執行緒池,請求處理池  <-- num.io.threads io執行緒數量

LogManager    kafka檔案儲存系統管理,負責處理和儲存所有Kafka的topic的partiton資料

TopicConfigManager 監聽此zk節點的⼦子節點/config/changes/,通過LogManager更新topic的配置資訊,topic粒度配置管理,具體請檢視topic級別配置

KafkaHealthcheck 監聽zk session expire,在zk上建立broker資訊,便於其他broker和consumer獲取其資訊

KafkaController kafka叢集中央控制器選舉,leader選舉,副本分配。

KafkaScheduler 負責副本管理和日誌管理排程等等

ZkClient         負責註冊zk相關資訊.

BrokerTopicStats topic資訊統計和監控

ControllerStats          中央控制器統計和監控

KafkaServer部分主要程式碼如下:

......  
def startup() {
    info("starting")
    isShuttingDown = new AtomicBoolean(false)
    shutdownLatch = new CountDownLatch(1)

    /* start scheduler */
    kafkaScheduler.startup()
    
    /* setup zookeeper */
    zkClient = initZk()

    /* start log manager */
    logManager = createLogManager(zkClient)
    logManager.startup()

    socketServer = new SocketServer(config.brokerId,
                                    config.hostName,
                                    config.port,
                                    config.numNetworkThreads,
                                    config.queuedMaxRequests,
                                    config.socketSendBufferBytes,
                                    config.socketReceiveBufferBytes,
                                    config.socketRequestMaxBytes)
    socketServer.startup()

    replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
    kafkaController = new KafkaController(config, zkClient)
    
    /* start processing requests */
    apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
   
    Mx4jLoader.maybeLoad()

    replicaManager.startup()

    kafkaController.startup()
    
    topicConfigManager = new TopicConfigManager(zkClient, logManager)
    topicConfigManager.startup()
    
    /* tell everyone we are alive */
    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
    kafkaHealthcheck.startup()

    
    registerStats()
    startupComplete.set(true);
    info("started")
  }
  
  private def initZk(): ZkClient = {
    info("Connecting to zookeeper on " + config.zkConnect)
    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
    ZkUtils.setupCommonPaths(zkClient)
    zkClient
  }

  /**
   *  Forces some dynamic jmx beans to be registered on server startup.
   */
  private def registerStats() {
    BrokerTopicStats.getBrokerAllTopicsStats()
    ControllerStats.uncleanLeaderElectionRate
    ControllerStats.leaderElectionTimer
  }
.......