1. 程式人生 > 實用技巧 >Kafka Stream 高階應用

Kafka Stream 高階應用

9.1將Kafka 與其他資料來源整合

對於第一個高階應用程式示例,假設你在金融服務公司工作。公司希望將其現有資料遷移到新技術實現的系統中,該計劃包括使用 Kafka。資料遷移了一半,你被要求去更新公司的分析系統,其目的是實時顯示最新的股票交易和與之關聯的相關資訊,對於這種應用場景 Kafka Streams非常適合。
公司專注於提供金融市場不同領域的基金,該公司將基金交易實時記錄在關係型資料庫中,同時他們計劃最終將交易直接寫入Kafka,但是在短期內,資料庫依然是記錄系統。
假設傳入的資料被插入關係型資料庫中,那麼如何縮小資料庫與新興的 Kafka Streams應用程式之間的差距呢?答案是使用 Kafka Connect,它是 Apache Kafka的一部分,是將 Kafka與其他系統整合的框架。一旦 Kafka有資料,你將不再關心源資料的位置,而只需將 Kafka Streams應用程式指向源主題,就像其他 Kafka Streams應用程式一樣處理。
注意當使用 Kafka Connect從其他源獲取資料時,整合點就是 Kafka的主題。這意味著任何使用Kafka Consumer的應用程式都可以使用匯入的資料。
下圖展示了資料庫與 Kafka之間的整合是如何實現的。在本例中,將使用Kafka Connect來監控資料庫表和流更新,並將它們寫入 Kafka主題,該主題是金融分析應用程式的源。

9.1.1使用Kafka Connect整合資料

Kafka Connect設計的目的是將資料從其他系統流入 Kafka,以及將資料從 Kafka流入另一個數據儲存應用程式,例如 MongoDB或 Elasticsearch。使用 Kafka Connect可以將整個資料庫導 Kafka,或者其他資料,如效能指標。
Kafka Connect使用特定的聯結器與外部資料來源互動,幾種可用的聯結器參考 Confluent官網。很多聯結器都是由聯結器社群開發的,使得 Kafka幾乎可以與其他任何儲存系統進行整合。如果沒有你想要的聯結器,那麼你可以自己實現一個。

9.1.2配置 Connect

Kafka Connect有兩種執行模式,即分散式模式和獨立模式

。對於大多數生產環境,以分散式模式執行是有意義的,因為當執行多個聯結器例項時可以利用其並行性和容錯性。這裡,我們假設你在本機執行示例,因此所有的配置都是基於獨立模式的。
Kafka Connect用來與外部資料來源互動的聯結器有兩種型別,即源聯結器( source connector)和接收器聯結器( sink connector)下圖演示了 Kafka Connect如何使用這兩種型別的聯結器,正如所看到的,源聯結器將資料寫入 Kafka,而接收器聯結器從 Kafka接收資料供其他系統使用。

對於本例,將使用 Kafka JDBC聯結器。該聯結器可以在 GitHub官網上找到,為了方便我將該聯結器打包在原始碼中。
使用Kafka Conecet時,你需要對Kafka Connect自身以及用於匯入或匯出資料的單個聯結器做少量配置。首先,讓我們來看一下要用到的 Kafka Connect的配置引數。
■bootstrap.servers——Kafka Connect使用的 Kafka代理列表,多個代理之間以逗號隔開。
■key.converter——類轉換器,該轉換器控制訊息的鍵從 Kafka Connect格式到寫人Kafka的格式的序列化。
■value.converter——類轉換器,該轉換器控制訊息的值由 Kafka Connect格式到寫Kafka的格式的序列化。例如,可以使用內建的org. apache. kafka. connect.json. JsonConverter。
■value.converter. schemas. enable——true或者 false,指定 Kafka Connect是否包含值的模式。對於本例,將其值設定為fase,在下一節再解釋這樣設定的原因。

■plugin.path——告訴Kafka Connect所使用的聯結器及其依賴項的位置。此位置可以是單個、包含一個JAR檔案或多個JAR檔案的頂級目錄。也可以提供多條路徑,這些路徑由逗號分隔的位置列表表示。
■offset. storage.file. filename——包含Kafka包含 Connect的消費者儲存的偏移量的檔案。還需要為JDBC聯結器提供一些配置,這些配置引數說明如下。
■name——聯結器的名稱。
■connector. class——聯結器的類。
■tasks.max——聯結器使用的最大任務數。
■connection.url——用於連線資料庫的URL
■mode——JDBC源聯結器用於檢測變化的方法。
■incrementing.column.name——被跟蹤的用於檢測變化的列名。
■topic.prefix——Kafka Connect將每張表的資料寫入名為“topic. prefix+表名的主題。
這些配置中的大多數都很簡單,但我們仍需要對這些配置中的mode和incrementing. column.name兩個配置進行詳細討論,因為它們在聯結器的執行中起著積極作用。JDBC源聯結器使用mode配置項來檢測需要載入哪些行。本示例中該配置項被設定為incrementing,它依賴於一個自增列,每次插入一條記錄時該列的值加1。通過跟蹤遞增列,只拉取新插入的記錄,更新操作將被忽略。你的 Kafka Streams應用程式只拉取最新的股票購買,因此這種設定是很理想的。配置項 incrementing. column.name是指包含自增值的列名。

提示本書的原始碼包含 Kafka Connect和JDBC聯結器的近乎完整的配置,配置檔案位於本書原始碼的src/main/resources目錄下。你需要提供一些關於提取原始碼資源庫路徑的資訊,仔細閱讀 README.md(點選進行下載)檔案中的詳細說明。

9.1.3轉換資料

在獲得這個任務之前,你已經使用類似的資料開發了一個 Kafka Streams應用程式,因此已經有了現成的模型和 Serde物件(底層使用Gson進行JSON的序列化與反序列化)。為了保持較快的開發速度,你不希望編寫任何新的程式碼來支援使用 Kafka Connect。正如從下一節所看到的,你將能夠從 Kafka Connect中無縫匯入資料。
【提示:Gson是一個由谷歌 Apache公司開發的權庫,用於將Java物件序列化為json以及將json反序列化為Java物件。你可以從使用者指南中瞭解更多。】
為了實現這種無縫整合,需要對JDBC聯結器的屬性做一些較小的額外配置變更。在修改之前,讓我們回顧一下9.1.2節介紹的配置項。具體來講,在前面我說過使用org. apache.kafka.connect.kafka.connect.json.JsonConverter,並將模式禁用,值就會被轉換為簡單的JSON格式。
儘管JSON是你想在 Kafka Streams應用程式中使用的,但存在以下兩個問題。
第一,當將資料轉換為JSON格式時,列名將是轉換後的JSON字串欄位的名稱,這些名稱都是BSE內部縮寫的格式,在公司外部沒有任何意義。因此當 Gson serde從JSON轉換到期望的模型物件時,該物件的所有欄位均為空,因為JSON字串中的欄位名與該物件的欄位名不匹配。
第二,和預期一樣,儲存在資料庫中的日期和時間是時間戳型別的,但是所提供的 Gson serde並沒有為Date型別定義一個自定義的 TypeAdapte,因此所有日期都需要轉換為格式類似 yyyy-dd'T':mm:ss.SS-0400的字串。幸運的是, Kafka Connect提供了一種機制,能夠很輕鬆地解決這兩個問題。

Kafka Connect有轉換的設計思想,允許在 Kafka Connect將資料寫入 Kafka之前對資料做一些輕量的轉換。圖9-3展示了這個轉換過程發生的地方。
本示例中,將使用兩個內建的轉換操作類,即 TimestampConvert和 ReplaceField。如前所述,要使用這些轉換類,需要在 connector-jdbc- properties配置檔案中新增程式碼清單所示的幾行配置

name=stock-transaction-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:h2:tcp://localhost:9989/~/findata
mode=incrementing
incrementing.column.name=TXN_ID
topic.prefix=dbTxn

#類名轉化器
transforms=ConvertDate,Rename
#日期轉換器ConvertDate的別名型別
transforms.ConvertDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
#待轉換日期欄位
transforms.ConvertDate.field=TXNTS
#日期轉換後輸出的型別
transforms.ConvertDate.target.type=string
#日期的格式
transforms.ConvertDate.format=yyyy-MM-dd'T'HH:mm:ss.SSS-0400
#重新命名轉換器Rename的別名型別
transforms.Rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
#需要替換的列表名
transforms.Rename.renames=SMBL:symbol, SCTR:sector, INDSTY:industry, SHRS:shares, SHRPRC:sharePrice, CSTMRID:customerId, TXNTS:transactionTimestamp

這些屬性是相對自描述性的,因此我們不必在它們上面花太多時間。如你所見,它們恰好提供了你需要 Kafka Streams應用程式提供的對於由 Kafka Connect和JDBC聯結器匯入 Kafka的訊息成功進行反序列化。當所有 Kafka Connect元件就緒之後,要完成資料庫表與 Kafka Streams應用程式的整合,只需使用具有 connector-c-jdbc.properties檔案中指定的字首的主題。

 //StockTransaction物件的序列化和反序列化器
        Serde<StockTransaction> stockTransactionSerde = StreamsSerdes.StockTransactionSerde();

//使用Kafka Connect寫入記錄的主題作為流的源
        builder.stream("dbTxnTRANSACTIONS",  Consumed.with(stringSerde, stockTransactionSerde))
                     //KStream<K,V> peek​(ForeachAction<? super K,? super V> action)對KStream的每個記錄執行一個操作。這是無狀態逐記錄操作
                     //此處將訊息列印到控制檯
                      .peek((k, v)-> LOG.info("transactions from database key {} value {}",k, v))

此時,你正在使用 Kafka Streams處理來自資料庫表中的記錄,但是還有更多的事情要做。你正在通過流式處理採集股票交易資料,為了分析這些交易資料,你需要按股票程式碼將交易資料進行分組。
我們已經知道了如何選擇鍵並對記錄重新分割槽,但如果記錄在寫入 Kafka時帶有鍵則效率更高,因為Kafka Streams應用程式可以跳過重新分割槽的步驟,這就節省了處理時間和磁碟空間。讓我們再回顧一下 Kafka Connect的配置。
首先,你可以新增一個 ValueToKey轉換器,該轉換器根據所指定的欄位名列表從記錄的值中提取相應欄位,以用於鍵。更新 connector--dbc. properties檔案內容如程式碼清單:

#增加ExtractKey轉換器
transforms=ConvertDate,Rename,ExtractKey
transforms.ConvertDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ConvertDate.field=TXNTS
transforms.ConvertDate.target.type=string
transforms.ConvertDate.format=yyyy-MM-dd'T'HH:mm:ss.SSS-0400
transforms.Rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.Rename.renames=SMBL:symbol, SCTR:sector, INDSTY:industry, SHRS:shares, SHRPRC:sharePrice, CSTMRID:customerId, TXNTS:transactionTimestamp
#指定ExtractKey轉化器的型別
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ValueToKey
#流出需要抽取的欄位別名以用作鍵的欄位名
transforms.ExtractKey.fields=symbol

添加了一個別名為 ExtractKey的轉換器並通知 Kafka Connect轉換器對應的類名為ValueTokey.同時提供用於鍵的欄位名為 symbol,它可以由多個以逗號分隔的值組成,但本例只需要提供一個值。注意,這裡的欄位名是原欄位重新命名之後的版本,因為這個轉換器是在重新命名轉換器轉換之後才執行的。
ExtractKey提取的欄位結果是一個包含一個值的結構,但是你只想鍵對應的值即股票程式碼包括在結構中,為此可以新增一個FlattenStruct轉換器將股票程式碼提取出來。

#增加最後一個轉換器
transforms=ConvertDate,Rename,ExtractKey,FlattenStruct
transforms.ConvertDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ConvertDate.field=TXNTS
transforms.ConvertDate.target.type=string
transforms.ConvertDate.format=yyyy-MM-dd'T'HH:mm:ss.SSS-0400
transforms.Rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.Rename.renames=SMBL:symbol, SCTR:sector, INDSTY:industry, SHRS:shares, SHRPRC:sharePrice, CSTMRID:customerId, TXNTS:transactionTimestamp
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.ExtractKey.fields=symbol
#指定轉換器對應的類
transforms.FlattenStruct.type=org.apache.kafka.connect.transforms.ExtractField$Key
#帶抽取的欄位名稱
transforms.FlattenStruct.field=symbol

以上程式碼清單中添加了最後一個別名為FlattenStruct的轉換器,並指定該轉換器對應的型別為 ExtractFieldKey類,Kafka Connect使用該類來提取指定的欄位,並且在結果中只包括該欄位(在本例,該欄位為鍵)。最後提供了欄位名稱,本例指定該名稱為 symbol,和前一個轉換器指定的欄位一樣,這樣做是有意義的,因為這是用來建立鍵結構的欄位。
只需要增加幾行配置,就可以擴充套件之前的 Kafka Streams應用程式以執行更高階的操作,而無須選擇鍵並執行重新分割槽的步驟,如程式碼:

        //StockTransaction物件的序列化和反序列化器
        Serde<StockTransaction> stockTransactionSerde = StreamsSerdes.StockTransactionSerde();
       
        StreamsBuilder builder = new StreamsBuilder();



        //使用Kafka Connect寫入記錄的主題作為流的源
        builder.stream("dbTxnTRANSACTIONS",Consumed.with(stringSerde, stockTransactionSerde))
                     //KStream<K,V> peek​(ForeachAction<? super K,? super V> action)對KStream的每個記錄執行一個操作。這是無狀態逐記錄操作
                     //此處將訊息列印到控制檯
                      .peek((k, v)-> LOG.info("transactions from database key {} value {}",k, v))
                      //按鍵進行分組
                      .groupByKey(Serialized.with(stringSerde, stockTransactionSerde))
                /*
                 * KTable<K,VR> aggregate​(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator,Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized))
                 * 通過分組鍵聚合此流中的記錄值。具有空鍵或值的記錄將被忽略。
                 * initializer-用於計算初始中間聚合結果的Initializer
                 * aggregator-一個計算新聚合結果的聚合器,可用於實現諸如計數之類的聚合功能。
                 * Materialized-例項化的例項化,用於例項化狀態儲存。不能為null.with():使用提供的鍵和值Serdes例項化StateStore。
                 * 
                 * 在處理第一個輸入記錄之前,立即一次應用指定的Initializer,以提供用於處理第一個記錄的初始中間聚合結果。指定的Aggregator將應用於每個輸入記錄,並使用當前聚合(或使用通過Initializer提供的中間聚合結果用於第一條記錄)來計算新的聚合和記錄的值。
                 * 因此,aggregate(Initializer,Aggregator,Materialized)可用於計算諸如count(c.f. count())之類的聚合函式。
                 */
                      .aggregate(()-> 0L,(symb, stockTxn, numShares) -> numShares + stockTxn.getShares(),
                              Materialized.with(stringSerde, longSerde)).toStream()
                             .peek((k,v) -> LOG.info("Aggregated stock sales for {} {}",k, v))
                             .to( "stock-counts", Produced.with(stringSerde, longSerde));

因為資料傳入時就帶有鍵,所以可以適用groupByKey,它不會設定自動重新分割槽的標誌位。通過分組操作,可以直接進行一個聚合操作而無須重新分割槽。

9.2替代資料庫

在第4章中,我們學習瞭如何向 Kafka Streams應用程式新增本地狀態。流式應用程式需要使用狀態來執行類似聚合、歸約和連線的操作除非流式應用程式只處理單條記錄,否則就需要本地狀態。
根據第四章的需求的需求,我們已經開發了一個 Kafka Streams應用程式,它獲取股票交易的3類資訊:
■市場交易總額;
■客戶每次購買股票的數量;
■在視窗大小為10秒的翻轉視窗中,每支股票的總成交量。
到目前為止,在所有的示例中檢視程式執行結果的方式有兩種,一是通過控制檯檢視,二是從接收器主題中讀取結果。在控制檯檢視資料適合開發環境,但控制檯並不是展示結果的最佳方式。如果要做任何分析工作或者快速理解發生了什麼,儀表板應用程式是最好的展現方式。
本節將會介紹如何在 Kafka Streams中使用互動式查詢來開發一個用於檢視分析結果的儀表板應用程式,而不需要關係型資料庫來儲存狀態。直接將Kafka Streams作為資料流提供給儀表板應用程式。因此,儀表板應用程式中的資料自然會不斷更新。
在一個典型的架構中,捕獲和操作的資料會被推送到關係型資料庫中以用於檢視圖9-4展示了這種架構:在使用 Kafka Streams之前,通過 Kafka攝取資料,併發送給一個分析引擎,然後分析引擎將處理結果寫入資料庫,以提供給儀表板應用程式使用。

如果增加Kafka Streams使用本地狀態,那麼就要對上圖架構進行修改,如下圖所示刪掉整個叢集,可以顯著簡化架構。Kafka Streams依然將資料寫回Kafka,並且資料庫仍然是已轉換資料的主要使用者

互動式查詢可以讓你直接檢視狀態儲存中的資料,而不必先從Kafka中消費這些資料,換句話說流也成為資料,於是就有了以下調整。

Kafka Streams通過REST風格介面從流式應用程式外部提供只讀訪問。值得重申的是這個構想是這麼強大:你可以檢視流的執行狀態而不需要一個外部資料庫。

9.2.1互動式查詢工作原理

要使互動式查詢生效,Kafka Streams需要在只讀包裝器中公開狀態儲存。重點是要理解:雖然 Kafka Streams讓狀態儲存可以被查詢,但並沒有提供任何方式來更新和修改狀態儲存。 Kafka Streams通過Kafkastreams.store方法公開狀態儲存。
下面的程式碼片段是 store方法的使用示例:
ReadonlyWindowstore readonlystore = kafkastreams.store(storeName, Queryables.windowStore());
該示例檢索一個 WindowStore, QueryablestoreTypes還提供另外兩種型別的方法:
■QueryableStoreTypes.sessionStore();
■QueryableStoreTypes.keyValueStore();
一旦有了對只讀狀態儲存的引用,只需要將該狀態儲存公開給一個提供給使用者查詢流資料狀態的服務即可(例如一個REST風格的服務)但是檢索狀態儲存只是整個構想的一部分,這裡提取的狀態儲存將只包含本地儲存中包含的鍵。
注意:請記住, Kafka Streams為每個任務分配一個狀態儲存,只要使用同一個應用程式ID, Kafka Streams應用程式就可以由多個例項組成。此外這些例項並不需要都位於同一臺主機上。因此,有可能你查詢到的狀態儲存僅包含所有鍵的一個子集,其他狀態儲存(具有相同名稱,但位於其他機器上)可能包含鍵的另一個子集。
讓我們使用前面列出的分析來明確這個構想。

9.2.2分配狀態儲存

先看看第一個分析,按市場板塊聚合股票交易因為要進行聚合,所以狀態儲存將發揮作用。你希望公開狀態儲存,以提供每個市場板塊成交量的實時檢視,以深入瞭解目前市場哪個板塊最活躍。股票市場活動產生大量的資料,但我們只討論使用兩個分割槽來保持示例的詳細資訊。另外,假設你在位於同一個資料中心的兩臺獨立的機器上執行兩個單執行緒例項,由於 Kafka Streams的自動負載均衡功能,每個應用程式將有一個任務來處理來自輸入主題的每個分割槽的資料。
下圖展示了任務與狀態儲存的分配情況。正如你所看到的,例項A處理分割槽0上的所有記錄,而例項B處理分割槽1上的所有記錄。

"Energy":100000"分配到例項A的狀態儲存中,"Finance":110000分配到例項B的狀態儲存中。回到為了查詢而公開狀態儲存的示例,可以清楚地看到,如果將例項A上的狀態儲存公開給Web服務或任何外部查詢,則只能檢索到"Energy"鍵對應的值。
如何解決這個問題呢?你肯定不想建立一個單獨的Web服務來查詢每個例項—這種方式擴充套件性差。幸運的是你不必這樣做, Kafka Streams提供了一種就像設定配置一樣簡單的解決方案。

9.2.3建立和查詢分散式狀態儲存

若要啟用互動式查詢,需要設定 StreamsConfig. APPLICATION_SERVER_CONFIG引數,它包括 Kafka Streams應用程式的主機名及查詢服務將要監聽的埠,格式為 hostname:port。
當一個 Kafka Streams例項接收到給定鍵的查詢時,需要找出該鍵是否被包含在本地狀態儲存中。更重要的是,如果在本地沒找到,那麼你希望找到哪個例項包含該鍵並查詢該例項的狀態儲存。
KafkaStreams物件的幾個方法允許檢索由 APPLICATION_ SERVER_CONFIG定義的、具有相同應用程式ID所有執行例項的資訊。表9-1列出了這些方法名及其描述。表9-1檢索儲存元資料的方法

方法名

引數

用途

allMetadata

無引數

檢索所有例項,有些可能是遠端例項

allMetadataForstore

儲存的名稱

檢索包含指定儲存的所有例項(有些是遠端例項)

allMetadataForKey

鍵,Serializer

檢索包含有鍵儲存的所有例項(有些是遠端例項)

allMetadataForKey

鍵,StreamPartitioner

檢索包含有鍵儲存的所有例項(有些是遠端例項)


可以使用 KafkaStreams.allMetadata方法獲取有資格進行互動式查詢的所有例項的資訊。KafkaStreams.allMetadataForKey方法是我在寫互動式查詢時最常用的方法。
接下來,讓我們再看一下鍵/值在 Kafka Streams例項中的分佈,增加了檢查鍵"Finance"過程的順序,該鍵從另一個例項找到並返回。每一個Kafka Streams例項都內建一個輕量的伺服器,監聽 APPLICATION_ SERVERCONFIG中指定的埠。

需要重點指出的是:你只需要查詢 Kafka Streams某一個例項,至於查詢哪一個例項並不重要(前提是你已經正確配置了應用程式)通過使用RPC機制和元資料檢索方法,如果查詢的例項不包含待查詢的資料,則該例項會找到資料所在的位置,並提取結果,然後將結果返回給原始查詢
通過跟蹤上圖中的呼叫流,你可以在實際操作中看到這一點。例項A並不包含鍵“Finance”,但發現例項B包含該鍵,因此,例項A向例項B內建的伺服器發起一次方法呼叫,該方法檢素資料並將結果返回給原始的查詢。

9.2.4編寫互動式查詢

 public static void main(String[] args) {

        if(args.length < 2){
            LOG.error("Need to specify host, port");
            System.exit(1);
        }

        String host = args[0];
        int port = Integer.parseInt(args[1]);
        final HostInfo hostInfo = new HostInfo(host, port);

        Properties properties = getProperties();
        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host+":"+port);

現在需要提供兩個引數(主機名和埠),但這種更改影響微乎其微。你還可以嵌入本地伺服器執行查詢:對於這個實現,我選擇Spark Web伺服器。當然,如果你不喜歡Spark Web伺服器,請隨意替換為另一個Web伺服器

現在,讓我們看一下嵌入Spark伺服器的程式碼:

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
        InteractiveQueryServer queryServer = new InteractiveQueryServer(kafkaStreams, hostInfo);
        StateRestoreHttpReporter restoreReporter = new StateRestoreHttpReporter(queryServer);

        queryServer.init();

        kafkaStreams.setGlobalStateRestoreListener(restoreReporter);

        kafkaStreams.setStateListener(((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                LOG.info("Setting the query server to ready");
                queryServer.setReady(true);
            } else if (newState != KafkaStreams.State.RUNNING) {
                LOG.info("State not RUNNING, disabling the query server");
                queryServer.setReady(false);
            }
        }));

        kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
            LOG.error("Thread {} had a fatal error {}", t, e, e);
            shutdown(kafkaStreams, queryServer);
        });


        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            shutdown(kafkaStreams, queryServer);
        }));

        LOG.info("Stock Analysis KStream Interactive Query App Started");
        kafkaStreams.cleanUp();
        kafkaStreams.start();
    }

在這段程式碼中,建立了一個InteractiveQueryServer例項,它是一個包裝類,包含Spark Web伺服器和管理Web服務呼叫以及啟動和停止Web伺服器的程式碼。
第7章討論過使用狀態監聽器來通知一個 Kafka Streams應用程式的各種狀態,在這裡可以看到這個監聽器的有效使用。回想一下,當在執行互動式查詢時,需要使用 Streams Metadata例項來確定給定鍵的資料是否是正在處理查詢的例項的本地資料。將查詢伺服器的狀態設定為true,僅當在應用程式處於執行狀態時才允許訪問所需要的元資料。
要記住的一個關鍵點是返回的元資料是由 Kafka Streams應用程式組成的快照。在任何時候,你都可以伸縮應用程式。當這種情況發生時(或者,在其他任何合格事件發生時,如通過正則表示式來新增源節點的主題), Kafka Streams應用程式經歷再平衡階段,可能會更改分割槽的分配。在本示例中,只有處於執行狀態時才允許查詢,但可以隨意使用任何你認為合適的策略。
接下來是第7章中涉及的另一個例子:設定一個未捕獲的異常處理器。在本示例中,將記錄錯誤並關閉應用程式和查詢伺服器。因為這個應用程式無限期地執行,所以新增一個關閉鉤子用來當停止示例時關閉所有程式。