[Spark]-結構化流之監控&故障恢復篇
6 流的監控以及故障恢復
6.1.流的運行時數據
結構化流啟動後返回的 StreamingQuery
對象.
val query = df.writeStream.format("console").start() // get the query object
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name // get the name of the auto-generated or user-specified name
query.explain() // print detailed explanations of the query
query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
query.exception // the exception if the query has been terminated with error
query.recentProgress // an array of the most recent progress updates for this query
query.lastProgress // the most recent progress update of this streaming query
6.2 交互式(同步)監控
可以直接獲取活動查詢的當前狀態和指標使用 streamingQuery.lastProgress()
和 streamingQuery.status()
lastProgress()
返回一個 StreamingQueryProgress
對象 它有流的最後一個觸發器中取得的progress的全部信息 - 處理了哪些數據,處理率是多少,延遲等等
streamingQuery.recentProgress
返回最後幾個進度的 array
streamingQuery.status()
返回一個 StreamingQueryStatus
6.3 異步監控
在sparkSession上附加一個 StreamingQueryListener.
一旦你使用 sparkSession.streams.attachListener()
附加你的自定義 StreamingQueryListener
對象,當您啟動查詢和當有活動查詢有進度時停止時,您將收到 callbacks (回調)
6.4 故障恢復
如果發生 failure or intentional shutdown (故障或故意關機),您可以恢復之前的查詢的進度和狀態,並繼續停止的位置.(通過 checkpointing and write ahead logs (檢查點和預寫入日誌)完成)
通過配置 checkpoint location (檢查點位置)查詢,將保存所有進度信息(即,每個觸發器中處理的偏移範圍)和正在運行的 aggregates (聚合)
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
[Spark]-結構化流之監控&故障恢復篇