1. 程式人生 > >Spark Structured Streaming + Kafka使用筆記

Spark Structured Streaming + Kafka使用筆記

這篇部落格將會記錄Structured Streaming + Kafka的一些基本使用(Java 版)

spark 2.3.0

1. 概述

Structured Streaming (結構化流)是一種基於 Spark SQL 引擎構建的可擴充套件且容錯的 stream processing engine (流處理引擎)。可以使用Dataset/DataFrame API 來表示 streaming aggregations (流聚合), event-time windows (事件時間視窗), stream-to-batch joins (流到批處理連線) 等。

Dataset/DataFrame在同一個 optimized Spark SQL engine (優化的 Spark SQL 引擎)上執行計算後,系統通過 checkpointing (檢查點) 和 Write Ahead Logs (預寫日誌)來確保 end-to-end exactly-once (端到端的完全一次性) 容錯保證。

簡而言之,Structured Streaming 提供快速,可擴充套件,容錯,end-to-end exactly-once stream processing (端到端的完全一次性流處理),且無需使用者理解 streaming 。

具體原理可以看之前的這篇部落格

2. 資料來源

對於Kafka資料來源我們需要在Maven/SBT專案中引入:

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.3.2

首先我們需要建立SparkSession及開始接收資料,這裡以Kafka資料為例

SparkSession spark =
SparkSession .builder() .appName("appName") .getOrCreate(); Dataset<Row> df = spark.readStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic.*") .load(); df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"
)

這裡我們建立了SparkSession並訂閱了幾個host的Kafka。

這裡我們不需要自己設定group.id引數, Kafka Source 會將自動為每個查詢建立一個唯一的 group id

Kafka源資料中的schema如下:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

對於批處理和流查詢,須為 Kafka source 設定以下選項。

Option value meaning
assign json string {“topicA”:[0,1],“topicB”:[2,4]} 指定 TopicPartitions 來消費。針對 Kafka Source 只能指定 “assign”, “subscribe” 或 “subscribePattern” 其中的一個選項。
subscribe 逗號分隔的 topics 列表 要訂閱的 topic 列表。針對 Kafka Source 只能指定 “assign”, “subscribe” 或 “subscribePattern” 其中的一個選項
subscribePattern Java regex string 用於訂閱 topic(s) 的 pattern(模式)。針對 Kafka Source 只能指定 “assign”, “subscribe” 或 “subscribePattern” 其中的一個選項。
kafka.bootstrap.servers 逗號分隔的 host:port 列表 Kafka 中的 “bootstrap.servers” 配置。

以下配置是可選的:

Option value default query type meaning
startingOffsets “earliest”, “latest” (streaming only), or json string “”" {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-2}} “”" “latest” 用於 streaming, “earliest” 用於 batch(批處理) streaming 和 batch 當一個查詢開始的時候, 或者從最早的偏移量:“earliest”,或者從最新的偏移量:“latest”,或JSON字串指定為每個topicpartition起始偏移。在json中,-2作為偏移量可以用來表示最早的,-1到最新的。注意:對於批處理查詢,不允許使用最新的查詢(隱式或在json中使用-1)。對於流查詢,這隻適用於啟動一個新查詢時,並且恢復總是從查詢的位置開始,在查詢期間新發現的分割槽將會盡早開始。
endingOffsets latest or json string {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}} latest batch query 當一個批處理查詢結束時,或者從最新的偏移量:“latest”, 或者為每個topic分割槽指定一個結束偏移的json字串。在json中,-1作為偏移量可以用於引用最新的,而-2(最早)是不允許的偏移量。
failOnDataLoss true or false true streaming query 當資料丟失的時候,這是一個失敗的查詢。(如:主題被刪除,或偏移量超出範圍。)這可能是一個錯誤的警報。當它不像你預期的那樣工作時,你可以禁用它。如果由於資料丟失而不能從提供的偏移量中讀取任何資料,批處理查詢總是會失敗。
kafkaConsumer.pollTimeoutMs long 512 streaming and batch 在執行器中從卡夫卡輪詢執行資料,以毫秒為超時間隔單位。
fetchOffset.numRetries int 3 streaming and batch 放棄獲取卡夫卡偏移值之前重試的次數。
fetchOffset.retryIntervalMs long 10 streaming and batch 在重新嘗試取回Kafka偏移量之前等待毫秒值。
maxOffsetsPerTrigger long none streaming and batch 對每個觸發器間隔處理的偏移量的最大數量的速率限制。偏移量的指定總數將按比例在不同卷的topic分割槽上進行分割。

3. 解析資料

對於Kafka傳送過來的是JSON格式的資料,我們可以使用functions裡面的from_json()函式解析,並選擇我們所需要的列,並做相對的transformation處理。

注意在這裡不能有Action操作,如foreach(),這些操作需在後面StreamingQuery中使用

Dataset<Row> tboxDataSet = rawDataset
	.where("topic = my_topic")
	.select(functions.from_json(functions.col("value").cast("string"), tboxScheme).alias("parsed_value"))
	.select("parsed_value.columnA",
            "parsed_value.columnB",
            "parsed_value.columnC",
            "timestamp");

最後一行我們選擇了timestamp時間戳,以供後面時間視窗聚合使用。

4. 時間視窗

如果我們要使用groupby()函式對某個時間段所有的資料進行處理,我們則需要使用時間視窗函式如下:

Dataset<Row> windowtboxDataSet = tboxDataSet
	.withWatermark("timestamp", "5 seconds")
	.groupBy(functions.window(functions.col("timestamp"), "10 minutes", "5 minutes"),
		functions.col("columnA"))
    .count();

這裡對columnA列進行groupby()+count()計數,詳解如下:

4.1 簡易例子

為了理解時間視窗,舉一個官方例子:

ataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
        words.col("word"))
    .count();

  • 我們有一系列 arriving 的 records
  • 首先是一個對著時間列timestamp做長度為10m,滑動為5m的window()操作
    • 例如上圖右上角的虛框部分,當達到一條記錄 12:22|dog 時,會將 12:22 歸入兩個視窗 12:15-12:2512:20-12:30,所以產生兩條記錄:12:15-12:25|dog12:20-12:30|dog,對於記錄 12:24|dog owl 同理產生兩條記錄:12:15-12:25|dog owl12:20-12:30|dog owl
    • 所以這裡 window() 操作的本質是 explode(),可由一條資料產生多條資料
  • 然後對window()操作的結果,以window列和 word列為 key,做groupBy().count()操作
    • 這個操作的聚合過程是增量的(藉助 StateStore)
  • 最後得到一個有 window, word, count 三列的狀態集

4.2 OutputModes

我們繼續來看前面 window() + groupBy().count() 的例子,現在我們考慮將結果輸出,即考慮 OutputModes:

4.2.1 Complete

Complete 的輸出是和 State 是完全一致的:

img

4.2.2 Append

Append 的語義將保證,一旦輸出了某條 key,未來就不會再輸出同一個 key。

img

所以,在上圖 12:10 這個批次直接輸出 12:00-12:10|cat|1, 12:05-12:15|cat|1 將是錯誤的,因為在 12:20 將結果更新為了 12:00-12:10|cat|2,但是 Append 模式下卻不會再次輸出 12:00-12:10|cat|2,因為前面輸出過了同一條 key 12:00-12:10|cat 的結果12:00-12:10|cat|1

為了解決這個問題,在 Append 模式下,Structured Streaming 需要知道,某一條 key 的結果什麼時候不會再更新了。當確認結果不會再更新的時候(下一篇文章專門詳解依靠 watermark 確認結果不再更新),就可以將結果進行輸出。

img

如上圖所示,如果我們確定 12:30 這個批次以後不會再有對 12:00-12:10 這個 window 的更新,那麼我們就可以把 12:00-12:10 的結果在 12:30 這個批次輸出,並且也會保證後面的批次不會再輸出 12:00-12:10 的 window 的結果,維護了 Append 模式的語義。

4.2.3 Update

Update 模式已在 Spark 2.1.1 及以後版本獲得正式支援。

img

如上圖所示,在 Update 模式中,只有本執行批次 State 中被更新了的條目會被輸出:

  • 在 12:10 這個執行批次,State 中全部 2 條都是新增的(因而也都是被更新了的),所以輸出全部 2 條;
  • 在 12:20 這個執行批次,State 中 2 條是被更新了的、 4 條都是新增的(因而也都是被更新了的),所以輸出全部 6 條;
  • 在 12:30 這個執行批次,State 中 4 條是被更新了的,所以輸出 4 條。這些需要特別注意的一點是,如 Append 模式一樣,本執行批次中由於(通過 watermark 機制)確認 12:00-12:10 這個 window 不會再被更新,因而將其從 State 中去除,但沒有因此產生輸出。

4.3 Watermark 機制

對上面這個例子泛化一點,是:

  • (a+) 在對 event time 做 window() + groupBy().aggregation() 即利用狀態做跨執行批次的聚合,並且
  • (b+) 輸出模式為 Append 模式或 Update 模式

時,Structured Streaming 將依靠 watermark 機制來限制狀態儲存的無限增長、並(對 Append 模式)儘早輸出不再變更的結果。

換一個角度,如果既不是 Append 也不是 Update 模式,或者是 Append 或 Update 模式、但不需狀態做跨執行批次的聚合時,則不需要啟用 watermark 機制。

具體的,我們啟用 watermark 機制的方式是:

    val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
    
    // Group the data by window and word and compute the count of each group
    val windowedCounts = words
        .withWatermark("timestamp", "10 minutes")  // 注意這裡的 watermark 設定!
        .groupBy(
            window($"timestamp", "10 minutes", "5 minutes"),
            $"word")
        .count()

這樣即告訴 Structured Streaming,以 timestamp 列的最大值為錨點,往前推 10min 以前的資料不會再收到。這個值 —— 當前的最大 timestamp 再減掉 10min —— 這個隨著 timestamp 不斷更新的 Long 值,就是 watermark。 img

所以,在之前的這裡圖示中:

  • 12:20 這個批次結束後,錨點變成了 12:20|dog owl 這條記錄的 event time 12:20 ,watermark 變成了 12:20 - 10min = 12:10
  • 所以,在 12:30 批次結束時,即知道 event time 12:10 以前的資料不再收到了,因而 window 12:00-12:10 的結果也不會再被更新,即可以安全地輸出結果 12:00-12:10|cat|2
  • 在結果 12:00-12:10|cat|2 輸出以後,State 中也不再儲存 window 12:00-12:10 的相關資訊 —— 也即 State Store 中的此條狀態得到了清理。

5. 輸出

5.1 StreamingQuery定義

定義完 final result DataFrame/Dataset ,剩下的就是開始 streaming computation 。 為此,我們須使用 DataStreamWriter 通過 Dataset.writeStream() 返回。

  • Output mode (輸出模式): 指定寫入 output sink 的內容,即上文提到的complete, append, update模式
  • Query name (查詢名稱): 可選,指定用於標識的查詢的唯一名稱。
  • Trigger interval (觸發間隔): 可選,指定觸發間隔。 如果未指定,則系統將在上一次處理完成後立即檢查新資料的可用性。 如果由於先前的處理尚未完成而導致觸發時間錯誤,則系統將嘗試在下一個觸發點觸發,而不是在處理完成後立即觸發。
  • Checkpoint location (檢查點位置): 對於可以保證 end-to-end fault-tolerance (端對端容錯)能力的某些 output sinks ,請指定系統將寫入所有 checkpoint (檢查點)資訊的位置。 這是與 HDFS 相容的容錯檔案系統中的目錄。

不同的輸出模式有不同的相容性:

  • Append mode (default) - 這是預設模式,其中只有 自從上一次觸發以來,新增到 Result Table 的新行將會是 outputted to the sink 。 只有新增到 Result Table 的行將永遠不會改變那些查詢才支援這一點。即上文提到的一旦輸出了某條 key,未來就不會再輸出同一個 key。 因此,這種模式 保證每行只能輸出一次(假設 fault-tolerant sink )。例如,只有 select, where, map, flatMap, filter, join 等查詢支援 Append mode 。
  • Complete mode - 每次觸發後,整個 Result Table 將被輸出到 sink 。 aggregation queries (聚合查詢)支援這一點。
  • Update mode - (自 Spark 2.1.1 可用) 只有 Result Table rows 自上次觸發後更新將被輸出到 sink 。

5.2 Output Sinks

Spark有幾種型別的內建輸出接收器。

  • **File sink ** - 將輸出儲存到目錄中。
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
  • Foreach sink - 對 output 中的記錄執行 arbitrary computation ,一般很常用,可以將流資料儲存到資料庫等,詳細用法後面會提到
writeStream
    .foreach(...)
    .start()
  • **Console sink (for debugging) ** - 每次觸發時,將輸出列印到 console/stdout 。 都支援 Append 和 Complete 輸出模式。 這應該用於低資料量的除錯目的,因為在每次觸發後,整個輸出被收集並存儲在驅動程式的記憶體中。
writeStream
    .format("console")
    .start()
  • **Memory sink (for debugging) ** - 輸出作為 in-memory table (記憶體表)儲存在記憶體中。都支援 Append 和 Complete 輸出模式。 這應該用於除錯目的在低資料量下,整個輸出被收集並存儲在驅動程式的儲存器中。因此,請謹慎使用。
writeStream
    .format("memory")
    .queryName("tableName")
    .start()
  • Kafka sink將資料輸出至Kafka
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

Dataframe 寫入 Kafka 應該在 schema(模式)中有以下列:

Column Type
key (optional) string or binary
value (required) string or binary
topic (*optional) string

某些 sinks 是不容錯的,因為它們不能保證輸出的永續性並且僅用於除錯目的。參見前面的部分 容錯語義 。以下是 Spark 中所有接收器的詳細資訊。

Sink (接收器) Supported Output Modes (支援的輸出模式) Options (選項) Fault-tolerant (容錯) Notes (說明)
File Sink (檔案接收器) Append (附加) path: 必須指定輸出目錄的路徑。 有關特定於檔案格式的選項,請參閱 DataFrameWriter (Scala/Java/Python/R) 中的相關方法。 例如,對於 “parquet” 格式選項,請參閱 DataFrameWriter.parquet() Yes 支援對 partitioned tables (分割槽表)的寫入。按時間 Partitioning (劃分)可能是有用的。
Foreach Sink Append, Update, Compelete (附加,更新,完全) None 取決於 ForeachWriter 的實現。 更多詳細資訊在 下一節
Console Sink (控制檯接收器) Append, Update, Complete (附加,更新,完全) numRows: 每個觸發器需要列印的行數(預設:20) truncate: 如果輸出太長是否截斷(預設: true) No
Memory Sink (記憶體接收器) Append, Complete (附加,完全) None 否。但是在 Complete Mode 模式下,重新啟動的查詢將重新建立完整的表。 Table name is the query name.(表名是查詢的名稱)

5.3 Foreach

foreach 操作允許在輸出資料上計算 arbitrary operations 。從 Spark 2.1 開始,這隻適用於 Scala 和 Java 。為了使用這個,你必須實現介面 ForeachWriter 其具有在 trigger (觸發器)之後生成 sequence of rows generated as output (作為輸出的行的序列)時被呼叫的方法。 舉個例子:

// storage result into mongodb
        dataset.writeStream()
                .queryName("mongodb" + collectionName)
                .foreach(new ForeachWriter<Row>() {

                    Map<String, String> writeOverrides = new HashMap<String, String>() {{
                        put("uri", MongoDbConfig.MONGO_DB_URI);
                        put("database", MongoDbConfig.MONGO_MOFANG_TSP_DATA_DB);
                        put("collection", collectionName);
                    }};
                    WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);
                    MongoConnector mongoConnector = null;
                    ArrayList<Row> list = null;

                    @Override
                    public void process(Row value) {
                        list.add(value);
                    }

                    @Override
                    public void close(Throwable errorOrNull) {
                        if (!list.isEmpty()) {
                            mongoConnector.withCollectionDo(writeConfig, Document.class, (MongoCollection<Document> mongoCollection) -> {
                                for (Row row : list) {
                                    Map<String, Object> map = new HashMap<>();
                                    String[] fieldNames = row.schema().fieldNames();
                                    for (String s : fieldNames) {
                                        map.put(s, row.getAs(s));
                                    }
                                    Document document = new Document(map);
                                    mongoCollection.insertOne(document);
                                }
                                return null;
                            });
                        }
                    }

                    @Override
                    public boolean open(long partitionId, long version) {
                        mongoConnector = MongoConnector.apply(writeConfig.asOptions());
                        list = new ArrayList<>();
                        return true;
                    }
                })
                .start();
    }

以上程式碼將Dataset的所有列存入MongoDB的指定DB與Collection

注意以下要點。

  • writer 必須是 serializable (可序列化)的,因為它將被序列化併發送給 executors 執行。

  • open ,process 和 close 三個方法都會在executor上被呼叫。

  • 只有當呼叫 open 方法時,writer 才能執行所有的初始化(例如開啟連線,啟動事務等)。請注意,如果在建立物件時立即在類中進行任何初始化,那麼該初始化將在 driver 中發生(因為這是正在建立的例項)。

  • version 和 partition 是 open 中的兩個引數,它們獨特地表示一組需要被 pushed out 的行。 version 是每個觸發器增加的單調遞增的 id 。 partition 是一個表示輸出分割槽的 id ,因為輸出是分散式的,將在多個執行器上處理。

  • open 可以使用 version 和 partition 來選擇是否需要寫入行的順序。因此,它可以返回 true (繼續寫入)或 false ( 不需要寫入 )。如果返回 false ,那麼 process 不會在任何行上被呼叫。例如,在 partial failure (部分失敗)之後,失敗的觸發器的一些輸出分割槽可能已經被提交到資料庫。基於儲存在資料庫中的 metadata (元資料), writer 可以識別已經提交的分割槽,因此返回 false 以跳過再次提交它們。

  • 當 open 被呼叫時, close 也將被呼叫(除非 JVM 由於某些錯誤而退出)。即使 open 返回 false 也是如此。如果在處理和寫入資料時出現任何錯誤,那麼 close 將被錯誤地呼叫。我們有責任清理以 open 建立的狀態(例如,連線,事務等),以免資源洩漏。

6. 最後

最後等待所有流查詢完成:

 // await for termination
 try {
 	sparkSession.streams().awaitAnyTermination();
} catch (StreamingQueryException e) {
	e.printStackTrace();
}

管理StreamingQuery物件的全部操作如下:

StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query
SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates

7. Reference