kafkaSpout工作流程簡介
一切從kafkaSpout的open函式開始。
①、new一個ZKState的類,這個類主要是用於讀寫zk的資料。接下來的很多類會呼叫這個類中的方法,來獲取partition資訊、broker資訊和offset資訊,並更新offset資訊。
②、new一個DynamicPartitionConnections類。這個類的_reader變數會存放哪個partition的leader在哪個broker上,還有每個border的host和port這些資訊。這裡要注意一點,這個資訊的獲取的路徑,預設是zk的/brokers/topics和/brokers/ids下面。如果kafka的配置檔案裡面有指定zk的目錄級別,那spoutConf的brokerHosts就要指定這個路徑。
③、new一個ZkCoordinator或者StaticCoordinator類。這個類主要用來分配哪些partition分配給這個task,然後為分配而來的每個partition建立一個PartitionManager。每個PartitionManager負責一個partition的管理。獲取該partition的資料、記錄offset都靠這個類來協調。這個類被建立的時候,會根據自己負責的partition去zk那裡獲取初始化的offset。獲取offset的zk路徑是通過spoutConf的zkRoot和id設定的。
open函式的前期處理大概就以上這些。然後進入nextTuple函式。
①、從ZkCoordinator那裡獲取該task需要管理的partition對應的partitionManager。然後遍歷各個partitionManager,呼叫每個partitionManager的next函式。
②、next函式先判斷_waitintToEmit物件是否為空,如果為空,就呼叫fill函式進行填充。
③、fill函式主要通過KafkaUtils.fetchMessages從kafka獲取訊息回來,獲取回來的每個訊息會帶有offset。然後遍歷這些訊息,如果訊息的cur_offset大於歷史offset,就把訊息新增到_waitintToEmit中,並把cur_offset新增到_pending中。
④、回到next函式,遍歷_waitintToEmit,把message發射出去。然後把offset從_pending中移除。
⑤、經過②、③、④這三個步驟,就把kafka中的資料發射出去了。
⑥、隔一段時間執行每個partitionManager的commit方法,這個方法就是把已經發射出去的訊息的最大offset記錄到zk的對應目錄下。