1. 程式人生 > 其它 >Kafka——Kafka檔案儲存機制

Kafka——Kafka檔案儲存機制

 總結出自文章:https://www.jianshu.com/p/734cf729d77b

Kafka檔案儲存機制

  • 名詞概念
    • Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka叢集。
    • Topic:一類訊息,訊息存放的目錄即主題,例如page view日誌、click日誌等都可以以topic的形式存在,Kafka叢集能夠同時負責多個topic的分發。
    • Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列
    • Segmen:partition物理上由多個segment組成,每個Segment存著message資訊
    • Producer :生產message傳送到topic
    • Consumer:訂閱topic消費message, consumer作為一個執行緒來消費
    • Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置檔案中配置好的。各個consumer(consumer 執行緒)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 執行緒 )消費,如果一個message可以被多個consumer(consumer 執行緒 ) 消費的話,那麼這些consumer必須在不同的組。Kafka不支援一個partition中的message由兩個或兩個以上的consumer thread來處理,即便是來自不同的consumer group的也不行。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的資料的時候,由於要保證不能多個執行緒拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的效能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer執行緒去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴充套件,那麼再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴充套件性,吞吐量極高。這也就形成了分散式消費的概念。
  • 原理概念
    • 持久化
      • kafka使用檔案儲存訊息(append only log),這就直接決定kafka在效能上嚴重依賴檔案系統的本身特性.且無論任何OS下,對檔案系統本身的優化是非常艱難的.檔案快取/直接記憶體對映等是常用的手段.因為kafka是對日誌檔案進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將訊息暫時buffer起來,當訊息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO呼叫的次數.對於kafka而言,較高效能的磁碟,將會帶來更加直接的效能提升。
    • 效能
      • 除磁碟IO之外,我們還需要考慮網路IO,這直接關係到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將訊息buffer起來,當訊息的條數達到一定閥值時,批量傳送給broker;對於consumer端也是一樣,批量fetch多條訊息.不過訊息量的大小可以通過配置檔案來指定.對於kafka broker端,似乎有個sendfile系統呼叫可以潛在的提升網路IO的效能:將檔案的資料對映到系統記憶體中,socket直接讀取相應的記憶體區域即可,而無需程序再次copy和交換(這裡涉及到"磁碟IO資料"/"核心記憶體"/"程序記憶體"/"網路緩衝區",多者之間的資料copy).

        其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用訊息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮.可以將任何在網路上傳輸的訊息都經過壓縮.kafka支援gzip/snappy等多種壓縮方式。

    • 負載均衡
      • kafka叢集中的任何一個broker,都可以向producer提供metadata資訊,這些metadata中包含"叢集中存活的servers列表"/"partitions leader列表"等資訊(請參看zookeeper中的節點資訊). 當producer獲取到metadata資訊之後, producer將會和Topic下所有partition leader保持socket連線;訊息由producer直接通過socket傳送到broker,中間不會經過任何"路由層".

        非同步傳送,將多條訊息暫且在客戶端buffer起來,並將他們批量傳送到broker;小資料IO太多,會拖慢整體的網路延遲,批量延遲傳送事實上提升了網路效率;不過這也有一定的隱患,比如當producer失效時,那些尚未傳送的訊息將會丟失。

    • Topic模型
      • 其他JMS實現,訊息消費的位置是有prodiver保留,以便避免重複傳送訊息或者將沒有消費成功的訊息重發等,同時還要控制訊息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的訊息只有一個consumer在消費,且不存在訊息狀態的控制,也沒有複雜的訊息確認機制,可見kafka broker端是相當輕量級的.當訊息被consumer接收之後,consumer可以在本地儲存最後訊息的offset,並間歇性的向zookeeper註冊offset.由此可見,consumer客戶端也很輕量級。

        kafka中consumer負責維護訊息的消費記錄,而broker則不關心這些,這種設計不僅提高了consumer端的靈活性,也適度的減輕了broker端設計的複雜度;這是和眾多JMS prodiver的區別.此外,kafka中訊息ACK的設計也和JMS有很大不同,kafka中的訊息是批量(通常以訊息的條數或者chunk的尺寸為單位)傳送給consumer,當訊息消費成功後,向zookeeper提交訊息的offset,而不會向broker交付ACK.或許你已經意識到,這種"寬鬆"的設計,將會有"丟失"訊息/"訊息重發"的危險.

    • 訊息傳輸一致
      • Kafka提供3種訊息傳輸一致性語義:最多1次,最少1次,恰好1次。

        最少1次:可能會重傳資料,有可能出現數據被重複處理的情況;

        最多1次:可能會出現資料丟失情況;

        恰好1次:並不是指真正只傳輸1次,只不過有一個機制。確保不會出現“資料被重複處理”和“資料丟失”的情況。

        at most once: 消費者fetch訊息,然後儲存offset,然後處理訊息;當client儲存offset之後,但是在訊息處理過程中consumer程序失效(crash),導致部分訊息未能繼續處理.那麼此後可能其他consumer會接管,但是因為offset已經提前儲存,那麼新的consumer將不能fetch到offset之前的訊息(儘管它們尚沒有被處理),這就是"at most once".

        at least once: 消費者fetch訊息,然後處理訊息,然後儲存offset.如果訊息處理成功之後,但是在儲存offset階段zookeeper異常或者consumer失效,導致儲存offset操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的訊息,這就是"at least once".

        "Kafka Cluster"到消費者的場景中可以採取以下方案來得到“恰好1次”的一致性語義:

        最少1次+消費者的輸出中額外增加已處理訊息最大編號:由於已處理訊息最大編號的存在,不會出現重複處理訊息的情況。

    • 副本
      • kafka中,replication策略是基於partition,而不是topic;kafka將每個partition資料複製到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置檔案來設定。leader處理所有的read-write請求,follower需要和leader保持同步.Follower就像一個"consumer",消費訊息並儲存在本地日誌中;leader負責跟蹤所有的follower狀態,如果follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條訊息儲存成功,此訊息才被認為是"committed",那麼此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具有良好的網路環境.即使只有一個replicas例項存活,仍然可以保證訊息的正常傳送和接收,只要zookeeper叢集存活即可.

        選擇follower時需要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成為新的leader。

    • log
    • 分散式
      • kafka使用zookeeper來儲存一些meta資訊,並使用了zookeeper watch機制來發現meta資訊的變更並作出相應的動作(比如consumer失效,觸發負載均衡等)

        Broker node registry: 當一個kafka broker啟動後,首先會向zookeeper註冊自己的節點資訊(臨時znode),同時當broker和zookeeper斷開連線時,此znode也會被刪除.

        Broker Topic Registry: 當一個broker啟動時,會向zookeeper註冊自己持有的topic和partitions資訊,仍然是一個臨時znode.

        Consumer and Consumer group: 每個consumer客戶端被建立時,會向zookeeper註冊自己的資訊;此作用主要是為了"負載均衡".一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了效能考慮,讓partition相對均衡的分散到每個consumer上.

        Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置檔案指定,也可以由系統生成),此id用來標記消費者資訊.

        Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.

        Partition Owner registry: 用來標記partition正在被哪個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那麼將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"遊離"的partitions)

        當consumer啟動時,所觸發的操作:

        A) 首先進行"Consumer id Registry";

        B) 然後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那麼其他consumer接管partitions).

        C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.

        總結:

        1. Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連線併發送訊息.

        2. Broker端使用zookeeper用來註冊broker資訊,已經監測partition leader存活性.

        3. Consumer端使用zookeeper用來註冊consumer資訊,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連線,並獲取訊息。

    • Next
  • Next