Spark Structured Streaming + Kafka使用筆記
這篇部落格將會記錄Structured Streaming + Kafka的一些基本使用(Java 版)
spark 2.3.0
1. 概述
Structured Streaming (結構化流)是一種基於 Spark SQL 引擎構建的可擴充套件且容錯的 stream processing engine (流處理引擎)。可以使用Dataset/DataFrame API 來表示 streaming aggregations (流聚合), event-time windows (事件時間視窗), stream-to-batch joins (流到批處理連線) 等。
Dataset/DataFrame在同一個 optimized Spark SQL engine (優化的 Spark SQL 引擎)上執行計算後,系統通過 checkpointing (檢查點) 和 Write Ahead Logs (預寫日誌)來確保 end-to-end exactly-once (端到端的完全一次性) 容錯保證。
簡而言之,Structured Streaming 提供快速,可擴充套件,容錯,end-to-end exactly-once stream processing (端到端的完全一次性流處理),且無需使用者理解 streaming 。
具體原理可以看之前的這篇部落格
2. 資料來源
對於Kafka資料來源我們需要在Maven/SBT專案中引入:
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.3.2
首先我們需要建立SparkSession及開始接收資料,這裡以Kafka資料為例
SparkSession spark = SparkSession
.builder()
.appName("appName")
.getOrCreate();
Dataset<Row> df = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic.*")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)" )
這裡我們建立了SparkSession並訂閱了幾個host的Kafka。
這裡我們不需要自己設定group.id引數, Kafka Source 會將自動為每個查詢建立一個唯一的 group id
Kafka源資料中的schema如下:
Column | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
對於批處理和流查詢,須為 Kafka source 設定以下選項。
Option | value | meaning |
---|---|---|
assign | json string {“topicA”:[0,1],“topicB”:[2,4]} | 指定 TopicPartitions 來消費。針對 Kafka Source 只能指定 “assign”, “subscribe” 或 “subscribePattern” 其中的一個選項。 |
subscribe | 逗號分隔的 topics 列表 | 要訂閱的 topic 列表。針對 Kafka Source 只能指定 “assign”, “subscribe” 或 “subscribePattern” 其中的一個選項 |
subscribePattern | Java regex string | 用於訂閱 topic(s) 的 pattern(模式)。針對 Kafka Source 只能指定 “assign”, “subscribe” 或 “subscribePattern” 其中的一個選項。 |
kafka.bootstrap.servers | 逗號分隔的 host:port 列表 | Kafka 中的 “bootstrap.servers” 配置。 |
以下配置是可選的:
Option | value | default | query type | meaning |
---|---|---|---|---|
startingOffsets | “earliest”, “latest” (streaming only), or json string “”" {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-2}} “”" | “latest” 用於 streaming, “earliest” 用於 batch(批處理) | streaming 和 batch | 當一個查詢開始的時候, 或者從最早的偏移量:“earliest”,或者從最新的偏移量:“latest”,或JSON字串指定為每個topicpartition起始偏移。在json中,-2作為偏移量可以用來表示最早的,-1到最新的。注意:對於批處理查詢,不允許使用最新的查詢(隱式或在json中使用-1)。對於流查詢,這隻適用於啟動一個新查詢時,並且恢復總是從查詢的位置開始,在查詢期間新發現的分割槽將會盡早開始。 |
endingOffsets | latest or json string {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}} | latest | batch query | 當一個批處理查詢結束時,或者從最新的偏移量:“latest”, 或者為每個topic分割槽指定一個結束偏移的json字串。在json中,-1作為偏移量可以用於引用最新的,而-2(最早)是不允許的偏移量。 |
failOnDataLoss | true or false | true | streaming query | 當資料丟失的時候,這是一個失敗的查詢。(如:主題被刪除,或偏移量超出範圍。)這可能是一個錯誤的警報。當它不像你預期的那樣工作時,你可以禁用它。如果由於資料丟失而不能從提供的偏移量中讀取任何資料,批處理查詢總是會失敗。 |
kafkaConsumer.pollTimeoutMs | long | 512 | streaming and batch | 在執行器中從卡夫卡輪詢執行資料,以毫秒為超時間隔單位。 |
fetchOffset.numRetries | int | 3 | streaming and batch | 放棄獲取卡夫卡偏移值之前重試的次數。 |
fetchOffset.retryIntervalMs | long | 10 | streaming and batch | 在重新嘗試取回Kafka偏移量之前等待毫秒值。 |
maxOffsetsPerTrigger | long | none | streaming and batch | 對每個觸發器間隔處理的偏移量的最大數量的速率限制。偏移量的指定總數將按比例在不同卷的topic分割槽上進行分割。 |
3. 解析資料
對於Kafka傳送過來的是JSON格式的資料,我們可以使用functions裡面的from_json()函式解析,並選擇我們所需要的列,並做相對的transformation處理。
注意在這裡不能有Action操作,如foreach(),這些操作需在後面StreamingQuery中使用
Dataset<Row> tboxDataSet = rawDataset
.where("topic = my_topic")
.select(functions.from_json(functions.col("value").cast("string"), tboxScheme).alias("parsed_value"))
.select("parsed_value.columnA",
"parsed_value.columnB",
"parsed_value.columnC",
"timestamp");
最後一行我們選擇了timestamp時間戳,以供後面時間視窗聚合使用。
4. 時間視窗
如果我們要使用groupby()函式對某個時間段所有的資料進行處理,我們則需要使用時間視窗函式如下:
Dataset<Row> windowtboxDataSet = tboxDataSet
.withWatermark("timestamp", "5 seconds")
.groupBy(functions.window(functions.col("timestamp"), "10 minutes", "5 minutes"),
functions.col("columnA"))
.count();
這裡對columnA列進行groupby()+count()計數,詳解如下:
4.1 簡易例子
為了理解時間視窗,舉一個官方例子:
ataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word"))
.count();
- 我們有一系列 arriving 的 records
- 首先是一個對著時間列timestamp做長度為10m,滑動為5m的window()操作
- 例如上圖右上角的虛框部分,當達到一條記錄
12:22|dog
時,會將12:22
歸入兩個視窗12:15-12:25
、12:20-12:30
,所以產生兩條記錄:12:15-12:25|dog
、12:20-12:30|dog
,對於記錄12:24|dog owl
同理產生兩條記錄:12:15-12:25|dog owl
、12:20-12:30|dog owl
- 所以這裡 window() 操作的本質是 explode(),可由一條資料產生多條資料
- 例如上圖右上角的虛框部分,當達到一條記錄
- 然後對window()操作的結果,以window列和 word列為 key,做groupBy().count()操作
- 這個操作的聚合過程是增量的(藉助 StateStore)
- 最後得到一個有
window
,word
,count
三列的狀態集
4.2 OutputModes
我們繼續來看前面 window() + groupBy().count() 的例子,現在我們考慮將結果輸出,即考慮 OutputModes:
4.2.1 Complete
Complete 的輸出是和 State 是完全一致的:
4.2.2 Append
Append 的語義將保證,一旦輸出了某條 key,未來就不會再輸出同一個 key。
所以,在上圖 12:10
這個批次直接輸出 12:00-12:10|cat|1
, 12:05-12:15|cat|1
將是錯誤的,因為在 12:20
將結果更新為了 12:00-12:10|cat|2
,但是 Append 模式下卻不會再次輸出 12:00-12:10|cat|2
,因為前面輸出過了同一條 key 12:00-12:10|cat
的結果12:00-12:10|cat|1
。
為了解決這個問題,在 Append 模式下,Structured Streaming 需要知道,某一條 key 的結果什麼時候不會再更新了。當確認結果不會再更新的時候(下一篇文章專門詳解依靠 watermark 確認結果不再更新),就可以將結果進行輸出。
如上圖所示,如果我們確定 12:30
這個批次以後不會再有對 12:00-12:10
這個 window 的更新,那麼我們就可以把 12:00-12:10
的結果在 12:30
這個批次輸出,並且也會保證後面的批次不會再輸出 12:00-12:10
的 window 的結果,維護了 Append 模式的語義。
4.2.3 Update
Update 模式已在 Spark 2.1.1 及以後版本獲得正式支援。
如上圖所示,在 Update 模式中,只有本執行批次 State 中被更新了的條目會被輸出:
- 在 12:10 這個執行批次,State 中全部 2 條都是新增的(因而也都是被更新了的),所以輸出全部 2 條;
- 在 12:20 這個執行批次,State 中 2 條是被更新了的、 4 條都是新增的(因而也都是被更新了的),所以輸出全部 6 條;
- 在 12:30 這個執行批次,State 中 4 條是被更新了的,所以輸出 4 條。這些需要特別注意的一點是,如 Append 模式一樣,本執行批次中由於(通過 watermark 機制)確認
12:00-12:10
這個 window 不會再被更新,因而將其從 State 中去除,但沒有因此產生輸出。
4.3 Watermark 機制
對上面這個例子泛化一點,是:
- (a+) 在對 event time 做 window() + groupBy().aggregation() 即利用狀態做跨執行批次的聚合,並且
- (b+) 輸出模式為 Append 模式或 Update 模式
時,Structured Streaming 將依靠 watermark 機制來限制狀態儲存的無限增長、並(對 Append 模式)儘早輸出不再變更的結果。
換一個角度,如果既不是 Append 也不是 Update 模式,或者是 Append 或 Update 模式、但不需狀態做跨執行批次的聚合時,則不需要啟用 watermark 機制。
具體的,我們啟用 watermark 機制的方式是:
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10 minutes") // 注意這裡的 watermark 設定!
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
這樣即告訴 Structured Streaming,以 timestamp 列的最大值為錨點,往前推 10min 以前的資料不會再收到。這個值 —— 當前的最大 timestamp 再減掉 10min —— 這個隨著 timestamp 不斷更新的 Long 值,就是 watermark。
所以,在之前的這裡圖示中:
- 在
12:20
這個批次結束後,錨點變成了12:20|dog owl
這條記錄的 event time12:20
,watermark 變成了12:20 - 10min = 12:10
; - 所以,在
12:30
批次結束時,即知道 event time12:10
以前的資料不再收到了,因而 window12:00-12:10
的結果也不會再被更新,即可以安全地輸出結果12:00-12:10|cat|2
; - 在結果
12:00-12:10|cat|2
輸出以後,State 中也不再儲存 window12:00-12:10
的相關資訊 —— 也即 State Store 中的此條狀態得到了清理。
5. 輸出
5.1 StreamingQuery定義
定義完 final result DataFrame/Dataset ,剩下的就是開始 streaming computation 。 為此,我們須使用 DataStreamWriter 通過 Dataset.writeStream() 返回。
- Output mode (輸出模式): 指定寫入 output sink 的內容,即上文提到的complete, append, update模式
- Query name (查詢名稱): 可選,指定用於標識的查詢的唯一名稱。
- Trigger interval (觸發間隔): 可選,指定觸發間隔。 如果未指定,則系統將在上一次處理完成後立即檢查新資料的可用性。 如果由於先前的處理尚未完成而導致觸發時間錯誤,則系統將嘗試在下一個觸發點觸發,而不是在處理完成後立即觸發。
- Checkpoint location (檢查點位置): 對於可以保證 end-to-end fault-tolerance (端對端容錯)能力的某些 output sinks ,請指定系統將寫入所有 checkpoint (檢查點)資訊的位置。 這是與 HDFS 相容的容錯檔案系統中的目錄。
不同的輸出模式有不同的相容性:
- Append mode (default) - 這是預設模式,其中只有 自從上一次觸發以來,新增到 Result Table 的新行將會是 outputted to the sink 。 只有新增到 Result Table 的行將永遠不會改變那些查詢才支援這一點。即上文提到的一旦輸出了某條 key,未來就不會再輸出同一個 key。 因此,這種模式 保證每行只能輸出一次(假設 fault-tolerant sink )。例如,只有 select, where, map, flatMap, filter, join 等查詢支援 Append mode 。
- Complete mode - 每次觸發後,整個 Result Table 將被輸出到 sink 。 aggregation queries (聚合查詢)支援這一點。
- Update mode - (自 Spark 2.1.1 可用) 只有 Result Table rows 自上次觸發後更新將被輸出到 sink 。
5.2 Output Sinks
Spark有幾種型別的內建輸出接收器。
- **File sink ** - 將輸出儲存到目錄中。
writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
- Foreach sink - 對 output 中的記錄執行 arbitrary computation ,一般很常用,可以將流資料儲存到資料庫等,詳細用法後面會提到
writeStream
.foreach(...)
.start()
- **Console sink (for debugging) ** - 每次觸發時,將輸出列印到 console/stdout 。 都支援 Append 和 Complete 輸出模式。 這應該用於低資料量的除錯目的,因為在每次觸發後,整個輸出被收集並存儲在驅動程式的記憶體中。
writeStream
.format("console")
.start()
- **Memory sink (for debugging) ** - 輸出作為 in-memory table (記憶體表)儲存在記憶體中。都支援 Append 和 Complete 輸出模式。 這應該用於除錯目的在低資料量下,整個輸出被收集並存儲在驅動程式的儲存器中。因此,請謹慎使用。
writeStream
.format("memory")
.queryName("tableName")
.start()
- Kafka sink將資料輸出至Kafka
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
Dataframe 寫入 Kafka 應該在 schema(模式)中有以下列:
Column | Type |
---|---|
key (optional) | string or binary |
value (required) | string or binary |
topic (*optional) | string |
某些 sinks 是不容錯的,因為它們不能保證輸出的永續性並且僅用於除錯目的。參見前面的部分 容錯語義 。以下是 Spark 中所有接收器的詳細資訊。
Sink (接收器) | Supported Output Modes (支援的輸出模式) | Options (選項) | Fault-tolerant (容錯) | Notes (說明) |
---|---|---|---|---|
File Sink (檔案接收器) | Append (附加) | path : 必須指定輸出目錄的路徑。 有關特定於檔案格式的選項,請參閱 DataFrameWriter (Scala/Java/Python/R) 中的相關方法。 例如,對於 “parquet” 格式選項,請參閱 DataFrameWriter.parquet() |
Yes | 支援對 partitioned tables (分割槽表)的寫入。按時間 Partitioning (劃分)可能是有用的。 |
Foreach Sink | Append, Update, Compelete (附加,更新,完全) | None | 取決於 ForeachWriter 的實現。 | 更多詳細資訊在 下一節 |
Console Sink (控制檯接收器) | Append, Update, Complete (附加,更新,完全) | numRows : 每個觸發器需要列印的行數(預設:20) truncate : 如果輸出太長是否截斷(預設: true) |
No | |
Memory Sink (記憶體接收器) | Append, Complete (附加,完全) | None | 否。但是在 Complete Mode 模式下,重新啟動的查詢將重新建立完整的表。 | Table name is the query name.(表名是查詢的名稱) |
5.3 Foreach
foreach 操作允許在輸出資料上計算 arbitrary operations 。從 Spark 2.1 開始,這隻適用於 Scala 和 Java 。為了使用這個,你必須實現介面 ForeachWriter 其具有在 trigger (觸發器)之後生成 sequence of rows generated as output (作為輸出的行的序列)時被呼叫的方法。 舉個例子:
// storage result into mongodb
dataset.writeStream()
.queryName("mongodb" + collectionName)
.foreach(new ForeachWriter<Row>() {
Map<String, String> writeOverrides = new HashMap<String, String>() {{
put("uri", MongoDbConfig.MONGO_DB_URI);
put("database", MongoDbConfig.MONGO_MOFANG_TSP_DATA_DB);
put("collection", collectionName);
}};
WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);
MongoConnector mongoConnector = null;
ArrayList<Row> list = null;
@Override
public void process(Row value) {
list.add(value);
}
@Override
public void close(Throwable errorOrNull) {
if (!list.isEmpty()) {
mongoConnector.withCollectionDo(writeConfig, Document.class, (MongoCollection<Document> mongoCollection) -> {
for (Row row : list) {
Map<String, Object> map = new HashMap<>();
String[] fieldNames = row.schema().fieldNames();
for (String s : fieldNames) {
map.put(s, row.getAs(s));
}
Document document = new Document(map);
mongoCollection.insertOne(document);
}
return null;
});
}
}
@Override
public boolean open(long partitionId, long version) {
mongoConnector = MongoConnector.apply(writeConfig.asOptions());
list = new ArrayList<>();
return true;
}
})
.start();
}
以上程式碼將Dataset的所有列存入MongoDB的指定DB與Collection
注意以下要點。
-
writer 必須是 serializable (可序列化)的,因為它將被序列化併發送給 executors 執行。
-
open ,process 和 close 三個方法都會在executor上被呼叫。
-
只有當呼叫 open 方法時,writer 才能執行所有的初始化(例如開啟連線,啟動事務等)。請注意,如果在建立物件時立即在類中進行任何初始化,那麼該初始化將在 driver 中發生(因為這是正在建立的例項)。
-
version 和 partition 是 open 中的兩個引數,它們獨特地表示一組需要被 pushed out 的行。 version 是每個觸發器增加的單調遞增的 id 。 partition 是一個表示輸出分割槽的 id ,因為輸出是分散式的,將在多個執行器上處理。
-
open 可以使用 version 和 partition 來選擇是否需要寫入行的順序。因此,它可以返回 true (繼續寫入)或 false ( 不需要寫入 )。如果返回 false ,那麼 process 不會在任何行上被呼叫。例如,在 partial failure (部分失敗)之後,失敗的觸發器的一些輸出分割槽可能已經被提交到資料庫。基於儲存在資料庫中的 metadata (元資料), writer 可以識別已經提交的分割槽,因此返回 false 以跳過再次提交它們。
-
當 open 被呼叫時, close 也將被呼叫(除非 JVM 由於某些錯誤而退出)。即使 open 返回 false 也是如此。如果在處理和寫入資料時出現任何錯誤,那麼 close 將被錯誤地呼叫。我們有責任清理以 open 建立的狀態(例如,連線,事務等),以免資源洩漏。
6. 最後
最後等待所有流查詢完成:
// await for termination
try {
sparkSession.streams().awaitAnyTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
管理StreamingQuery物件的全部操作如下:
StreamingQuery query = df.writeStream().format("console").start(); // get the query object
query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart
query.name(); // get the name of the auto-generated or user-specified name
query.explain(); // print detailed explanations of the query
query.stop(); // stop the query
query.awaitTermination(); // block until query is terminated, with stop() or with error
query.exception(); // the exception if the query has been terminated with error
query.recentProgress(); // an array of the most recent progress updates for this query
query.lastProgress(); // the most recent progress update of this streaming query
SparkSession spark = ...
spark.streams().active(); // get the list of currently active streaming queries
spark.streams().get(id); // get a query object by its unique id
spark.streams().awaitAnyTermination(); // block until any one of them terminates