1. 程式人生 > >Kafka Stream

Kafka Stream

成本高 增加 string 級別 開源 事件處理 manager 區別 效率

Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature(當前:1.0.0-rc0,參見:https://github.com/apache/kafka/releases),它提供了對存儲於Kafka內的數據進行流式處理和分析的功能。其主要特點如下:

  • Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常方便地嵌入任意Java應用中,也可以任意方式打包和部署
  • 除了Kafka外,無任何外部依賴
  • 充分利用Kafka分區機制實現水平擴展和順序性保證
  • 通過可容錯的state store實現高效的狀態操作(如windowed join和aggregation)
  • 支持正好一次處理語義(exactly once)
  • 提供記錄級的處理能力,從而實現毫秒級的低延遲
  • 支持基於事件時間的窗口操作,並且可處理晚到的數據(late arrival of records)
  • 同時提供底層的處理原語Processor(類似於Storm的spout和bolt),以及高層抽象的DSL(類似於Spark的map/group/reduce)

簡言之,Kafka Streams解決了流式處理中的如下困難問題

  • 毫秒級延遲的逐個事件處理
  • 有狀態的處理,包括分布式連接和聚合
  • 方便的DSL
  • 使用類似DataFlow的模型對無序數據進行窗口化
  • 具有快速故障切換的分布式處理和容錯能力
  • 無停機滾動部署

為什麽要有Kafka Stream


當前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基於Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對於熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。既然Apache Spark與Apache Storm擁用如此多的優勢,那為何還需要Kafka Stream呢?主要有如下原因:

  • 第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基於Kafka的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,並且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。
  • 技術分享
  • 第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復雜。而Kafka Stream作為類庫,可以非常方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。更為重要的是,Kafka Stream充分利用了Kafka的分區機制和Consumer的Rebalance機制,使得Kafka Stream可以非常方便的水平擴展,並且各個實例可以使用不同的部署方式。具體來說,每個運行Kafka Stream的應用程序實例都包含了Kafka Consumer實例,多個同一應用的實例之間並行處理數據集。而不同實例之間的部署方式並不要求一致,比如部分實例可以運行在Web容器中,部分實例可運行在Docker或Kubernetes中。
  • 第三,就流式處理系統而言,基本都支持Kafka作為數據源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統的標準數據源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低
  • 第四,使用Storm或Spark Streaming時,需要為框架本身的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對於應用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預留內存。
  • 第五,由於Kafka本身提供數據持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。
  • 第六,由於Kafka Consumer Rebalance機制,Kafka Stream可以在線動態調整並行度。

KTable vs. KStream


KTable和KStream是Kafka Stream中非常重要的兩個概念,它們是Kafka實現各種語義的基礎。因此這裏有必要分析下二者的區別。

  • KStream是一個數據流,可以認為所有記錄都通過Insert only的方式插入進這個數據流裏。
  • KTable: 代表一個完整的數據集可以理解為數據庫中的表。由於每條記錄都是Key-Value對,這裏可以將Key理解為數據庫中的Primary Key,而Value可以理解為一行記錄。可以認為KTable中的數據都是通過Update only的方式進入的。也就意味著,如果KTable對應的Topic中新進入的數據的Key已經存在,那麽從KTable只會取出同一Key對應的最後一條數據,相當於新的數據更新了舊的數據。

以下圖為例,假設有一個KStream和KTable,基於同一個Topic創建,並且該Topic中包含如下圖所示5條數據。此時遍歷KStream將得到與Topic內數據完全一樣的所有5條數據,且順序不變。而此時遍歷KTable時,因為這5條記錄中有3個不同的Key,所以將得到3條記錄,每個Key對應最新的值,並且這三條數據之間的順序與原來在Topic中的順序保持一致。這一點與Kafka的日誌compact相同。

技術分享

此時如果對該KStream和KTable分別基於key做Group,對Value進行Sum,得到的結果將會不同。對KStream的計算結果是<Jack,4>,<Lily,7>,<Mike,4>。而對Ktable的計算結果是<Mike,4>,<Jack,3>,<Lily,5>

State store

流式處理中,部分操作是無狀態的,例如過濾操作(Kafka Stream DSL中用filer方法實現)。而部分操作是有狀態的,需要記錄中間狀態,如Window操作和聚合計算。

State store被用來存儲中間狀態。它可以是一個持久化的Key-Value存儲,也可以是內存中的HashMap,或者是數據庫。Kafka提供了基於Topic的狀態存儲。Topic中存儲的數據記錄本身是Key-Value形式的,同時Kafka的log compaction機制可對歷史數據做compact操作,保留每個Key對應的最後一個Value,從而在保證Key不丟失的前提下,減少總數據量,從而提高查詢效率。

構造KTable時,需要指定其state store name。默認情況下,該名字也即用於存儲該KTable的狀態的Topic的名字,遍歷KTable的過程,實際就是遍歷它對應的state store,或者說遍歷Topic的所有key,並取每個Key最新值的過程。為了使得該過程更加高效,默認情況下會對該Topic進行compact操作。

另外,除了KTable,所有狀態計算,都需要指定state store name,從而記錄中間狀態

時間:

在流式數據處理中,時間是數據的一個非常重要的屬性。從Kafka 0.10開始,每條記錄除了Key和Value外,還增加了timestamp屬性。目前Kafka Stream支持三種時間

  • 事件發生時間:事件發生的時間包含在數據記錄中。發生時間由Producer在構造ProducerRecord時指定。並且需要Broker或者Topic將message.timestamp.type設置為CreateTime(默認值)才能生效。
  • 消息接收時間:也即消息存入Broker的時間。當Broker或Topic將message.timestamp.type設置為LogAppendTime時生效。此時Broker會在接收到消息後,存入磁盤前,將其timestamp屬性值設置為當前機器時間。一般消息接收時間比較接近於事件發生時間,部分場景下可代替事件發生時間。
  • 消息處理時間:也即Kafka Stream處理消息時的時間。

註:Kafka Stream允許通過實現org.apache.kafka.streams.processor.TimestampExtractor接口自定義記錄時間。

窗口:

流式數據是在時間上無界的數據而聚合操作只能作用在特定的數據集,也即有界的數據集上因此需要通過某種方式從無界的數據集上按特定的語義選取出有界的數據。窗口是一種非常常用的設定計算邊界的方式。不同的流式處理系統支持的窗口類似,但不盡相同。Kafka Stream支持的窗口如下:

  • Hopping Time Window: 該窗口定義如下圖所示。它有兩個屬性,一個是Window size,一個是Advance interval。Window size指定了窗口的大小,也即每次計算的數據集的大小。而Advance interval定義輸出的時間間隔。一個典型的應用場景是,每隔5秒鐘輸出一次過去1個小時內網站的PV或者UV
  • 技術分享
  • Tumbling Time Window: 可以認為它是Hopping Time Window的一種特例,也即Window size和Advance interval相等。它的特點是各個Window之間完全不相交
  • Sliding Window:該窗口只用於2個KStream進行Join計算時。該窗口的大小定義了Join兩側KStream的數據記錄被認為在同一個窗口的最大時間差。假設該窗口的大小為5秒,則參與Join的2個KStream中,記錄時間差小於5的記錄被認為在同一個窗口中,可以進行Join計算。
  • Session Window:該窗口用於對Key做Group後的聚合操作中。它需要對Key做分組,然後對組內的數據根據業務需求定義一個窗口的起始點和結束點。一個典型的案例是,希望通過Session Window計算某個用戶訪問網站的時間。對於一個特定的用戶(用Key表示)而言,當發生登錄操作時,該用戶(Key)的窗口即開始,當發生退出操作或者超時時,該用戶(Key)的窗口即結束。窗口結束時,可計算該用戶的訪問時間或者點擊次數等。

Join:

kafka Stream由於包含KStream和Ktable兩種數據集,因此提供如下Join計算

  • KTable Join KTable 結果仍為KTable。任意一邊有更新,結果KTable都會更新。
  • KStream Join KStream 結果為KStream。必須帶窗口操作,否則會造成Join操作一直不結束。
  • KStream Join KTable / GlobakKTable 結果為KStream。只有當KStream中有新數據時,才會觸發Join計算並輸出結果。KStream無新數據時,KTable的更新並不會觸發Join計算,也不會輸出數據。並且該更新只對下次Join生效。一個典型的使用場景是,KStream中的訂單信息與KTable中的用戶信息做關聯計算。

對於Join操作,如果要得到正確的計算結果,需要保證參與Join的KTable或KStream中Key相同的數據被分配到同一個Task。具體方法是

  • 參與Join的KTable或KStream的Key類型相同(實際上,業務含意也應該相同)
  • 參與Join的KTable或KStream對應的Topic的Partition數相同
  • Partitioner策略的最終結果等效(實現不需要完全一樣,只要效果一樣即可),也即Key相同的情況下,被分配到ID相同的Partition內

聚合與亂序處理:

聚合操作可應用於KStream和KTable。當聚合發生在KStream上時必須指定窗口,從而限定計算的目標數據集。需要說明的是,聚合操作的結果肯定是KTable。因為KTable是可更新的,可以在晚到的數據到來時(也即發生數據亂序時)更新結果KTable。

這裏舉例說明。假設對KStream以5秒為窗口大小,進行Tumbling Time Window上的Count操作。並且KStream先後出現時間為1秒, 3秒, 5秒的數據,此時5秒的窗口已達上限,Kafka Stream關閉該窗口,觸發Count操作並將結果3輸出到KTable中(假設該結果表示為<1-5,3>)。若1秒後,又收到了時間為2秒的記錄,由於1-5秒的窗口已關閉,若直接拋棄該數據,則可認為之前的結果<1-5,3>不準確。而如果直接將完整的結果<1-5,4>輸出到KStream中,則KStream中將會包含該窗口的2條記錄,<1-5,3>, <1-5,4>,也會存在骯數據。因此Kafka Stream選擇將聚合結果存於KTable中,此時新的結果<1-5,4>會替代舊的結果<1-5,3>。用戶可得到完整的正確的結果。這種方式保證了數據準確性,同時也提高了容錯性。

但需要說明的是,Kafka Stream並不會對所有晚到的數據都重新計算並更新結果集,而是讓用戶設置一個retention period,將每個窗口的結果集在內存中保留一定時間,該窗口內的數據晚到時,直接合並計算,並更新結果KTable。超過retention period後,該窗口結果將從內存中刪除,並且晚到的數據即使落入窗口,也會被直接丟棄。

容錯:

Kafka Stream從如下幾個方面進行容錯:

  • 高可用的Partition保證無數據丟失。每個Task計算一個Partition,而Kafka數據復制機制保證了Partition內數據的高可用性,故無數據丟失風險。同時由於數據是持久化的,即使任務失敗,依然可以重新計算。
  • 狀態存儲實現快速故障恢復和從故障點繼續處理。對於Join和聚合及窗口等有狀態計算,狀態存儲可保存中間狀態。即使發生Failover或Consumer Rebalance,仍然可以通過狀態存儲恢復中間狀態,從而可以繼續從Failover或Consumer Rebalance前的點繼續計算。
  • KTable與retention period提供了對亂序數據的處理能力。

Kafka Stream整體架構


kafka stream的架構如下:

技術分享

前(Kafka 0.11.0.0)Kafka Stream的數據源只能如上圖所示是Kafka。但是處理結果並不一定要如上圖所示輸出到Kafka。上圖中的Consumer和Producer並不需要開發者在應用中顯示實例化,而是由Kafka Stream根據參數隱式實例化和管理,從而降低了使用門檻。開發者只需要專註於開發核心業務邏輯,也即上圖中Task內的部分。

Processor Topology:基於Kafka Stream的流式應用的業務邏輯全部通過一個被稱為Processor Topology的地方執行。它與Storm的Topology和Spark的DAG類似,都定義了數據在各個處理單元(在Kafka Stream中被稱作Processor)間的流動方式,或者說定義了數據的處理邏輯。

Kafka Stream並行模型:Kafka Stream的並行模型中,最小粒度為Task,而每個Task包含一個特定子Topology的所有Processor。因此每個Task所執行的代碼完全一樣,唯一的不同在於所處理的數據集互補。如下圖展示了在一個進程(Instance)中以2個Topic(Partition數均為4)為數據源的Kafka Stream應用的並行模型。從圖中可以看到,由於Kafka Stream應用的默認線程數為1,所以4個Task全部在一個線程中運行。

技術分享

為了充分利用多線程的優勢,可以設置Kafka Stream的線程數。下圖展示了線程數為2時的並行模型。

技術分享

Kafka Stream可被嵌入任意Java應用(理論上基於JVM的應用都可以)中,下圖展示了在同一臺機器的不同進程中同時啟動同一Kafka Stream應用時的並行模型。註意,這裏要保證兩個進程的StreamsConfig.APPLICATION_ID_CONFIG完全一樣。因為Kafka Stream將APPLICATION_ID_CONFI作為隱式啟動的Consumer的Group ID。只有保證APPLICATION_ID_CONFI相同,才能保證這兩個進程的Consumer屬於同一個Group,從而可以通過Consumer Rebalance機制拿到互補的數據集。

技術分享

既然實現了多進程部署,可以以同樣的方式實現多機器部署。該部署方式也要求所有進程的APPLICATION_ID_CONFIG完全一樣。從圖上也可以看到,每個實例中的線程數並不要求一樣。但是無論如何部署,Task總數總會保證一致。

技術分享

應用示例


示例完整代碼地址: https://github.com/habren/KafkaExample ,Schemal結構說明:

  • orderStream: 訂單KStream, 底層Topic的Partition數為3, Key為用戶名, Value包含用戶名,商品名,訂單時間,數量。
  • userTable: 用戶KTable, 底層Topic的Partition數為3, Key為用戶名, Value包含性別,地址和年齡。
  • itemTable: 商品KTable, 底層Topic的Partition數為6, Key為商品名, Value包含價格,種類和產地。

現在希望計算每小時購買產地與自己所在地相同的用戶總數。

  1. 首先由於希望使用訂單時間,而它包含在orderStream的Value中,需要通過提供一個實現TimestampExtractor接口的類從orderStream對應的Topic中抽取出訂單時
    • public class OrderTimestampExtractor implements TimestampExtractor {
      
        @Override
        public long extract(ConsumerRecord<Object, Object> record) {
          if(record instanceof Order) {
            return ((Order)record).getTS();
          } else {
            return 0;
          }
        }
      }

  2. 通過將orderStream與userTable進行Join,來獲取訂單用戶所在地。由於二者對應的Topic的Partition數相同,且Key都為用戶名,再假設Producer往這兩個Topic寫數據時所用的Partitioner實現相同,則此時上文所述Join條件滿足,可直接進行Join(代碼註釋: 從下面代碼中,可以看到,Join時需要指定如何從參與Join雙方的記錄生成結果記錄的Value。Key不需要指定,因為結果記錄的Key與Join Key相同,故無須指定。Join結果存於名為orderUserStream的KStream中)
    • orderUserStream = orderStream
          .leftJoin(userTable, 
               // 該lamda表達式定義了如何從orderStream與userTable生成結果集的Value
              (Order order, User user) -> OrderUser.fromOrderUser(order, user), 
               // 結果集Key序列化方式
              Serdes.String(),
               // 結果集Value序列化方式
               SerdesFactory.serdFrom(Order.class))
          .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)

  3. 接下來需要將orderUserStream與itemTable進行Join,從而獲取商品產地。此時orderUserStream的Key仍為用戶名,而itemTable對應的Topic的Key為產品名,並且二者的Partition數不一樣,因此無法直接Join。此時需要通過through方法,對其中一方或雙方進行重新分區,使得二者滿足Join條件。這一過程相當於Spark的Shuffle過程和Storm的FieldGrouping(代碼註釋:從下面代碼可見,through時需要指定Key的序列化器,Value的序列化器,以及分區方式和結果集所在的Topic。這裏要註意,該Topic(orderuser-repartition-by-item)的Partition數必須與itemTable對應Topic的Partition數相同,並且through使用的分區方法必須與iteamTable對應Topic的分區方式一樣。經過這種through操作,orderUserStream與itemTable滿足了Join條件,可直接進行Join)
    • orderUserStrea
          .through(
              // Key的序列化方式
              Serdes.String(),
              // Value的序列化方式 
              SerdesFactory.serdFrom(OrderUser.class), 
              // 重新按照商品名進行分區,具體取商品名的哈希值,然後對分區數取模
              (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, 
              "orderuser-repartition-by-item")
          .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))

小結:


  1. Kafka Stream的並行模型完全基於Kafka的分區機制和Rebalance機制,實現了在線動態調整並行度
  2. 同一Task包含了一個子Topology的所有Processor,使得所有處理邏輯都在同一線程內完成,避免了不必的網絡通信開銷,從而提高了效率。
  3. through方法提供了類似Spark的Shuffle機制,為使用不同分區策略的數據提供了Join的可能
  4. log compact提高了基於Kafka的state store的加載效率
  5. state store為狀態計算提供了可能
  6. 基於offset的計算進度管理以及基於state store的中間狀態管理為發生Consumer rebalance或Failover時從斷點處繼續處理提供了可能,並為系統容錯性提供了保障
  7. KTable的引入,使得聚合計算擁用了處理亂序問題的能力

參考資料:


  • http://www.infoq.com/cn/articles/kafka-analysis-part-7
  • https://docs.confluent.io/3.2.0/streams/introduction.html

Kafka Stream