Spark Structured Streaming + Kafka使用筆記

這篇部落格將會記錄Structured Streaming + Kafka的一些基本使用(Java 版)

spark 2.3.0

1. 概述

Structured Streaming (結構化流)是一種基於 Spark SQL 引擎構建的可擴充套件且容錯的 stream processing engine (流處理引擎)。可以使用Dataset/DataFrame API 來表示 streaming aggregations (流聚合), event-time windows (事件時間視窗), stream-to-batch joins (流到批處理連線) 等。

Dataset/DataFrame在同一個 optimized Spark SQL engine (優化的 Spark SQL 引擎)上執行計算後,系統通過 checkpointing (檢查點) 和 Write Ahead Logs (預寫日誌)來確保 end-to-end exactly-once (端到端的完全一次性) 容錯保證。

簡而言之,Structured Streaming 提供快速,可擴充套件,容錯,end-to-end exactly-once stream processing (端到端的完全一次性流處理),且無需使用者理解 streaming 。


2. 資料來源


groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.3.2


SparkSession spark =
SparkSession .builder() .appName("appName") .getOrCreate(); Dataset<Row> df = spark.readStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic.*") .load(); df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"


這裡我們不需要自己設定group.id引數, Kafka Source 會將自動為每個查詢建立一個唯一的 group id


Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

對於批處理和流查詢,須為 Kafka source 設定以下選項。

Option value meaning
assign json string {“topicA”:[0,1],“topicB”:[2,4]} 指定 TopicPartitions 來消費。針對 Kafka Source 只能指定 “assign”, “subscribe” 或 “subscribePattern” 其中的一個選項。
subscribe 逗號分隔的 topics 列表 要訂閱的 topic 列表。針對 Kafka Source 只能指定 “assign”, “subscribe” 或 “subscribePattern” 其中的一個選項
subscribePattern Java regex string 用於訂閱 topic(s) 的 pattern(模式)。針對 Kafka Source 只能指定 “assign”, “subscribe” 或 “subscribePattern” 其中的一個選項。
kafka.bootstrap.servers 逗號分隔的 host:port 列表 Kafka 中的 “bootstrap.servers” 配置。


Option value default query type meaning
startingOffsets “earliest”, “latest” (streaming only), or json string “”" {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-2}} “”" “latest” 用於 streaming, “earliest” 用於 batch(批處理) streaming 和 batch 當一個查詢開始的時候, 或者從最早的偏移量:“earliest”,或者從最新的偏移量:“latest”,或JSON字串指定為每個topicpartition起始偏移。在json中,-2作為偏移量可以用來表示最早的,-1到最新的。注意:對於批處理查詢,不允許使用最新的查詢(隱式或在json中使用-1)。對於流查詢,這隻適用於啟動一個新查詢時,並且恢復總是從查詢的位置開始,在查詢期間新發現的分割槽將會盡早開始。
endingOffsets latest or json string {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}} latest batch query 當一個批處理查詢結束時,或者從最新的偏移量:“latest”, 或者為每個topic分割槽指定一個結束偏移的json字串。在json中,-1作為偏移量可以用於引用最新的,而-2(最早)是不允許的偏移量。
failOnDataLoss true or false true streaming query 當資料丟失的時候,這是一個失敗的查詢。(如:主題被刪除,或偏移量超出範圍。)這可能是一個錯誤的警報。當它不像你預期的那樣工作時,你可以禁用它。如果由於資料丟失而不能從提供的偏移量中讀取任何資料,批處理查詢總是會失敗。
kafkaConsumer.pollTimeoutMs long 512 streaming and batch 在執行器中從卡夫卡輪詢執行資料,以毫秒為超時間隔單位。
fetchOffset.numRetries int 3 streaming and batch 放棄獲取卡夫卡偏移值之前重試的次數。
fetchOffset.retryIntervalMs long 10 streaming and batch 在重新嘗試取回Kafka偏移量之前等待毫秒值。
maxOffsetsPerTrigger long none streaming and batch 對每個觸發器間隔處理的偏移量的最大數量的速率限制。偏移量的指定總數將按比例在不同卷的topic分割槽上進行分割。

3. 解析資料



Dataset<Row> tboxDataSet = rawDataset
	.where("topic = my_topic")
	.select(functions.from_json(functions.col("value").cast("string"), tboxScheme).alias("parsed_value"))


4. 時間視窗


Dataset<Row> windowtboxDataSet = tboxDataSet
	.withWatermark("timestamp", "5 seconds")
	.groupBy(functions.window(functions.col("timestamp"), "10 minutes", "5 minutes"),


4.1 簡易例子


ataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),

  • 我們有一系列 arriving 的 records
  • 首先是一個對著時間列timestamp做長度為10m,滑動為5m的window()操作
    • 例如上圖右上角的虛框部分,當達到一條記錄 12:22|dog 時,會將 12:22 歸入兩個視窗 12:15-12:2512:20-12:30,所以產生兩條記錄:12:15-12:25|dog12:20-12:30|dog,對於記錄 12:24|dog owl 同理產生兩條記錄:12:15-12:25|dog owl12:20-12:30|dog owl
    • 所以這裡 window() 操作的本質是 explode(),可由一條資料產生多條資料
  • 然後對window()操作的結果,以window列和 word列為 key,做groupBy().count()操作
    • 這個操作的聚合過程是增量的(藉助 StateStore)
  • 最後得到一個有 window, word, count 三列的狀態集

4.2 OutputModes

我們繼續來看前面 window() + groupBy().count() 的例子,現在我們考慮將結果輸出,即考慮 OutputModes:

4.2.1 Complete

Complete 的輸出是和 State 是完全一致的:


4.2.2 Append

Append 的語義將保證,一旦輸出了某條 key,未來就不會再輸出同一個 key。


所以,在上圖 12:10 這個批次直接輸出 12:00-12:10|cat|1, 12:05-12:15|cat|1 將是錯誤的,因為在 12:20 將結果更新為了 12:00-12:10|cat|2,但是 Append 模式下卻不會再次輸出 12:00-12:10|cat|2,因為前面輸出過了同一條 key 12:00-12:10|cat 的結果12:00-12:10|cat|1

為了解決這個問題,在 Append 模式下,Structured Streaming 需要知道,某一條 key 的結果什麼時候不會再更新了。當確認結果不會再更新的時候(下一篇文章專門詳解依靠 watermark 確認結果不再更新),就可以將結果進行輸出。


如上圖所示,如果我們確定 12:30 這個批次以後不會再有對 12:00-12:10 這個 window 的更新,那麼我們就可以把 12:00-12:10 的結果在 12:30 這個批次輸出,並且也會保證後面的批次不會再輸出 12:00-12:10 的 window 的結果,維護了 Append 模式的語義。

4.2.3 Update

Update 模式已在 Spark 2.1.1 及以後版本獲得正式支援。


如上圖所示,在 Update 模式中,只有本執行批次 State 中被更新了的條目會被輸出:

  • 在 12:10 這個執行批次,State 中全部 2 條都是新增的(因而也都是被更新了的),所以輸出全部 2 條;
  • 在 12:20 這個執行批次,State 中 2 條是被更新了的、 4 條都是新增的(因而也都是被更新了的),所以輸出全部 6 條;
  • 在 12:30 這個執行批次,State 中 4 條是被更新了的,所以輸出 4 條。這些需要特別注意的一點是,如 Append 模式一樣,本執行批次中由於(通過 watermark 機制)確認 12:00-12:10 這個 window 不會再被更新,因而將其從 State 中去除,但沒有因此產生輸出。

4.3 Watermark 機制


  • (a+) 在對 event time 做 window() + groupBy().aggregation() 即利用狀態做跨執行批次的聚合,並且
  • (b+) 輸出模式為 Append 模式或 Update 模式

時,Structured Streaming 將依靠 watermark 機制來限制狀態儲存的無限增長、並(對 Append 模式)儘早輸出不再變更的結果。

換一個角度,如果既不是 Append 也不是 Update 模式,或者是 Append 或 Update 模式、但不需狀態做跨執行批次的聚合時,則不需要啟用 watermark 機制。

具體的,我們啟用 watermark 機制的方式是:

    val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
    // Group the data by window and word and compute the count of each group
    val windowedCounts = words
        .withWatermark("timestamp", "10 minutes")  // 注意這裡的 watermark 設定!
            window($"timestamp", "10 minutes", "5 minutes"),

這樣即告訴 Structured Streaming,以 timestamp 列的最大值為錨點,往前推 10min 以前的資料不會再收到。這個值 —— 當前的最大 timestamp 再減掉 10min —— 這個隨著 timestamp 不斷更新的 Long 值,就是 watermark。 img


  • 12:20 這個批次結束後,錨點變成了 12:20|dog owl 這條記錄的 event time 12:20 ,watermark 變成了 12:20 - 10min = 12:10
  • 所以,在 12:30 批次結束時,即知道 event time 12:10 以前的資料不再收到了,因而 window 12:00-12:10 的結果也不會再被更新,即可以安全地輸出結果 12:00-12:10|cat|2
  • 在結果 12:00-12:10|cat|2 輸出以後,State 中也不再儲存 window 12:00-12:10 的相關資訊 —— 也即 State Store 中的此條狀態得到了清理。

5. 輸出

5.1 StreamingQuery定義

定義完 final result DataFrame/Dataset ,剩下的就是開始 streaming computation 。 為此,我們須使用 DataStreamWriter 通過 Dataset.writeStream() 返回。

  • Output mode (輸出模式): 指定寫入 output sink 的內容,即上文提到的complete, append, update模式
  • Query name (查詢名稱): 可選,指定用於標識的查詢的唯一名稱。
  • Trigger interval (觸發間隔): 可選,指定觸發間隔。 如果未指定,則系統將在上一次處理完成後立即檢查新資料的可用性。 如果由於先前的處理尚未完成而導致觸發時間錯誤,則系統將嘗試在下一個觸發點觸發,而不是在處理完成後立即觸發。
  • Checkpoint location (檢查點位置): 對於可以保證 end-to-end fault-tolerance (端對端容錯)能力的某些 output sinks ,請指定系統將寫入所有 checkpoint (檢查點)資訊的位置。 這是與 HDFS 相容的容錯檔案系統中的目錄。


  • Append mode (default) - 這是預設模式,其中只有 自從上一次觸發以來,新增到 Result Table 的新行將會是 outputted to the sink 。 只有新增到 Result Table 的行將永遠不會改變那些查詢才支援這一點。即上文提到的一旦輸出了某條 key,未來就不會再輸出同一個 key。 因此,這種模式 保證每行只能輸出一次(假設 fault-tolerant sink )。例如,只有 select, where, map, flatMap, filter, join 等查詢支援 Append mode 。
  • Complete mode - 每次觸發後,整個 Result Table 將被輸出到 sink 。 aggregation queries (聚合查詢)支援這一點。
  • Update mode - (自 Spark 2.1.1 可用) 只有 Result Table rows 自上次觸發後更新將被輸出到 sink 。

5.2 Output Sinks


  • **File sink ** - 將輸出儲存到目錄中。
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
  • Foreach sink - 對 output 中的記錄執行 arbitrary computation ,一般很常用,可以將流資料儲存到資料庫等,詳細用法後面會提到
  • **Console sink (for debugging) ** - 每次觸發時,將輸出列印到 console/stdout 。 都支援 Append 和 Complete 輸出模式。 這應該用於低資料量的除錯目的,因為在每次觸發後,整個輸出被收集並存儲在驅動程式的記憶體中。
  • **Memory sink (for debugging) ** - 輸出作為 in-memory table (記憶體表)儲存在記憶體中。都支援 Append 和 Complete 輸出模式。 這應該用於除錯目的在低資料量下,整個輸出被收集並存儲在驅動程式的儲存器中。因此,請謹慎使用。
  • Kafka sink將資料輸出至Kafka
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")

Dataframe 寫入 Kafka 應該在 schema(模式)中有以下列:

Column Type
key (optional) string or binary
value (required) string or binary
topic (*optional) string

某些 sinks 是不容錯的,因為它們不能保證輸出的永續性並且僅用於除錯目的。參見前面的部分 容錯語義 。以下是 Spark 中所有接收器的詳細資訊。

Sink (接收器) Supported Output Modes (支援的輸出模式) Options (選項) Fault-tolerant (容錯) Notes (說明)
File Sink (檔案接收器) Append (附加) path: 必須指定輸出目錄的路徑。 有關特定於檔案格式的選項,請參閱 DataFrameWriter (Scala/Java/Python/R) 中的相關方法。 例如,對於 “parquet” 格式選項,請參閱 DataFrameWriter.parquet() Yes 支援對 partitioned tables (分割槽表)的寫入。按時間 Partitioning (劃分)可能是有用的。
Foreach Sink Append, Update, Compelete (附加,更新,完全) None 取決於 ForeachWriter 的實現。 更多詳細資訊在 下一節
Console Sink (控制檯接收器) Append, Update, Complete (附加,更新,完全) numRows: 每個觸發器需要列印的行數(預設:20) truncate: 如果輸出太長是否截斷(預設: true) No
Memory Sink (記憶體接收器) Append, Complete (附加,完全) None 否。但是在 Complete Mode 模式下,重新啟動的查詢將重新建立完整的表。 Table name is the query name.(表名是查詢的名稱)

5.3 Foreach

foreach 操作允許在輸出資料上計算 arbitrary operations 。從 Spark 2.1 開始,這隻適用於 Scala 和 Java 。為了使用這個,你必須實現介面 ForeachWriter 其具有在 trigger (觸發器)之後生成 sequence of rows generated as output (作為輸出的行的序列)時被呼叫的方法。 舉個例子:

// storage result into mongodb
                .queryName("mongodb" + collectionName)
                .foreach(new ForeachWriter<Row>() {

                    Map<String, String> writeOverrides = new HashMap<String, String>() {{
                        put("uri", MongoDbConfig.MONGO_DB_URI);
                        put("database", MongoDbConfig.MONGO_MOFANG_TSP_DATA_DB);
                        put("collection", collectionName);
                    WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);
                    MongoConnector mongoConnector = null;
                    ArrayList<Row> list = null;

                    public void process(Row value) {

                    public void close(Throwable errorOrNull) {
                        if (!list.isEmpty()) {
                            mongoConnector.withCollectionDo(writeConfig, Document.class, (MongoCollection<Document> mongoCollection) -> {
                                for (Row row : list) {
                                    Map<String, Object> map = new HashMap<>();
                                    String[] fieldNames = row.schema().fieldNames();
                                    for (String s : fieldNames) {
                                        map.put(s, row.getAs(s));
                                    Document document = new Document(map);
                                return null;

                    public boolean open(long partitionId, long version) {
                        mongoConnector = MongoConnector.apply(writeConfig.asOptions());
                        list = new ArrayList<>();
                        return true;



  • writer 必須是 serializable (可序列化)的,因為它將被序列化併發送給 executors 執行。

  • open ,process 和 close 三個方法都會在executor上被呼叫。

  • 只有當呼叫 open 方法時,writer 才能執行所有的初始化(例如開啟連線,啟動事務等)。請注意,如果在建立物件時立即在類中進行任何初始化,那麼該初始化將在 driver 中發生(因為這是正在建立的例項)。

  • version 和 partition 是 open 中的兩個引數,它們獨特地表示一組需要被 pushed out 的行。 version 是每個觸發器增加的單調遞增的 id 。 partition 是一個表示輸出分割槽的 id ,因為輸出是分散式的,將在多個執行器上處理。

  • open 可以使用 version 和 partition 來選擇是否需要寫入行的順序。因此,它可以返回 true (繼續寫入)或 false ( 不需要寫入 )。如果返回 false ,那麼 process 不會在任何行上被呼叫。例如,在 partial failure (部分失敗)之後,失敗的觸發器的一些輸出分割槽可能已經被提交到資料庫。基於儲存在資料庫中的 metadata (元資料), writer 可以識別已經提交的分割槽,因此返回 false 以跳過再次提交它們。

  • 當 open 被呼叫時, close 也將被呼叫(除非 JVM 由於某些錯誤而退出)。即使 open 返回 false 也是如此。如果在處理和寫入資料時出現任何錯誤,那麼 close 將被錯誤地呼叫。我們有責任清理以 open 建立的狀態(例如,連線,事務等),以免資源洩漏。

6. 最後


 // await for termination
 try {
} catch (StreamingQueryException e) {


StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query
SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates

7. Reference