kafka原始碼之kafkaserver的啟動
KAFKA的啟動
Kafka啟動時,通過進入kafka的bin路徑下,執行如下指令碼:
./kafka-server-start.sh ../config/server.properties
這個指令碼會啟動Kafka類的例項,並執行main函式,傳入的引數是server.properties的路徑.
defmain(args: Array[String]): Unit = {try {
載入對應的server.properties配置檔案,並生成Properties例項.val serverProps = getPropsFromArgs(args)
這裡生成一個KafkaServer的例項,這個例項生成時,會在例項中同時生成一個KafkaServer的例項,
生成KafkaServer例項前,需要先通過serverProps生成出一個KafkaConfig的例項.val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
新增對kill操作的勾子函式.用於處理,如果直接kill時關閉kafkaserver.// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread() {override def run() = { kafkaServerStartable.shutdown
根據properties生成server例項
在KafkaServerStartable.fromProps(serverProps)函式呼叫時,也就是kakfa啟動時,
new KafkaServerStartable(KafkaConfig.
KafkaServerStartable例項生成時,會生成KafkaServer例項:
class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {private val server = new KafkaServer(serverConfig)
KafkaConfig.fromProps(serverProps)的函式呼叫流程:
deffromProps(props: Properties): KafkaConfig =fromProps(props, true)def fromProps(props: Properties, doLog: Boolean): KafkaConfig =new KafkaConfig(props, doLog)
KafkaServer例項用於對所有的元件進行統一的初始化與啟動.
KafkaServer的啟動
在Kafka的main函式中執行startup時,會呼叫KafkaServer例項中的startup函式.
在KafkaServer的例項生成時,會在每個logDir的目錄下生成一個meta.properties配置檔案,
這個檔案中主要記錄有這個kafka的版本與broker.id的值.
KafkaServer的例項啟動時,會生成kafka對外服務的socket server與相關元件,並對其進行啟動.
在執行startup函式時,下面分析下這個函式的具體的執行流程:
1,設定brokerState的狀態為Starting的狀態.
brokerState.newState(Starting)
2,啟動kafka的排程器,這個KafkaScheduler的例項生成時需要得到background.threads配置的值,預設是10個,用於配置後臺執行緒池的個數.
/* start scheduler */kafkaScheduler.startup()
3,初始化與zookeeper的連線,
這裡需要的配置項:
配置項zookeeper.connect,預設值localhost:2181.用於設定kafka連線的zookeeper的連線地址.
配置項zookeeper.session.timeout.ms,預設值6000ms,用於控制zk的session的超時時間,可設定為同步時間的2倍或3倍.
配置項zookeeper.connection.timeout.ms,預設值6000ms,用於配置連線zk的連線超時時間.
配置項zookeeper.sync.time.ms,預設值2000ms,用於與zk進行同步的時間間隔,
配置項zookeeper.set.acl,是否啟用zookeeper的acl控制,預設值為false,表示不啟用.
這裡得到的zkUtils例項是一個ZkUtils的例項,在例項生成後,會判斷zk中是否存在如下地址,如果不存在,會建立對應的路徑在zk上.
路徑/consumers,這個路徑用於消費者的client.id儲存對應消費的offset的路徑.
路徑/brokers/ids,這個路徑用於儲存所有的broker id的路徑.
路徑/brokers/topics,用於儲存每個broker對應的topics的資訊,
路徑/config/changes,還不知道,後期用到在說.
路徑/config/topics,還不知道,後期用到在說.
路徑/config/clients,還不知道,後期用到在說.
路徑/admin/delete_topics,用於儲存刪除的topic的資訊.
路徑/brokers/seqid,還不知道,後期用到在說.
路徑/isr_change_notification,這個用於在kafka的副本broker發生變化時用於通知的儲存路徑.
/* setup zookeeper */zkUtils = initZk()
4,初始化建立並啟動LogManager的例項,
/* start log manager */logManager = createLogManager(zkUtils.zkClient, brokerState)logManager.startup()
5,得到當前配置檔案中的brokerId的資訊.
如果broker.id的配置沒有配置(小於0的值時),同時broker.id.generation.enable配置為true,預設也就是true,這個時候根據zk中/brokers/seqid路徑的version值,第一次從0開始,每次增加.並加上reserved.broker.max.id配置的值,預設是1000,來充當這個server的broker.id,同時把這個broker.id更新到logDir目錄下的meta.properties檔案中,下次讀取時,直接讀取這個配置檔案中的broker.id的值,而不需要重新進行建立.
/* generate brokerId */config.brokerId = getBrokerIdthis.logIdent = "[Kafka Server " + config.brokerId + "], "
6,生成並啟動kafka的SocketServer.
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)socketServer.startup()
7,生成並啟動ReplicaManager,此例項依賴kafkaScheduler與logManager例項.
/* start replica manager */replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime,
zkUtils, kafkaScheduler, logManager,isShuttingDown)replicaManager.startup()
8,生成並啟動KafkaController例項,此使用用於控制當前的broker中的所有的leader的partition的操作.
/* start kafka controller */kafkaController = new KafkaController(config, zkUtils, brokerState,
kafkaMetricsTime, metrics, threadNamePrefix)kafkaController.startup()
9,生成並啟動GroupCoordinator的例項,這個是0.9新加入的一個玩意,用於對consumer中新加入的與partition的檢查,並對partition與consumer進行平衡操作.
/* start kafka coordinator */consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)consumerCoordinator.startup()
10,根據authorizer.class.name配置項配置的Authorizer的實現類,生成一個用於認證的例項,用於對使用者的操作進行認證.這個預設為不認證.
/* Get the authorizer and initialize it if one is specified.*/authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>val authZ = CoreUtils.createObject[Authorizer](authorizerClassName) authZ.configure(config.originals()) authZ}
11,生成用於對外對外提供服務的KafkaApis例項,並設定當前的broker的狀態為執行狀態.
/* start processing requests */apis = new KafkaApis(socketServer.requestChannel, replicaManager,
consumerCoordinator,kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics,
authorizer)requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)brokerState.newState(RunningAsBroker)
12,生成動態配置修改的處理管理,主要是topic修改與client端配置的修改,並把已經存在的clientid對應的配置進行修改.
/* start dynamic config manager */dynamicConfigHandlers = Map[String, ConfigHandler](
ConfigType.Topic -> new TopicConfigHandler(logManager),ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers)
)// TODO: Move this logic to DynamicConfigManagerAdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)}// Create the config manager. start listening to notificationsdynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)dynamicConfigManager.startup()
13,生成kafka的心跳檢查處理工具,這裡需要使用到listeners的配置,根據是否在IAAS的環境下,需要使用到advertised相關配置,
如果advertised.listeners配置項存在,直接使用配置的listener,
否則,如果advertised.host.name配置項或者advertised.port配置項存在,使用這兩個配置項,並使用明文傳輸(PLAINTEXT://host:port),如果advertised.port沒有配置,直接使用port的配置,host可以沒有設定
最後,如果上面的不都滿足,直接使用listeners的配置.預設是PLAINTEXT://:port
/* tell everyone we are alive */val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>if (endpoint.port == 0) (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))else(protocol, endpoint)}kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)kafkaHealthcheck.startup()