1. 程式人生 > >[Spark]-結構化流之監控&故障恢復篇

[Spark]-結構化流之監控&故障恢復篇

完成 位置 bject ESS sta llb att console dir

6 流的監控以及故障恢復

  6.1.流的運行時數據

   結構化流啟動後返回的 StreamingQuery 對象.    

    val query = df.writeStream.format("console").start()   // get the query object

    query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

    query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

    query.name        // get the name of the auto-generated or user-specified name

    query.explain()   // print detailed explanations of the query

    query.stop()      // stop the query

    query.awaitTermination()   // block until query is terminated, with stop() or with error

    query.exception       // the exception if the query has been terminated with error

    query.recentProgress  // an array of the most recent progress updates for this query

    query.lastProgress    // the most recent progress update of this streaming query

  6.2 交互式(同步)監控

    可以直接獲取活動查詢的當前狀態和指標使用 streamingQuery.lastProgress()streamingQuery.status()

    lastProgress() 返回一個 StreamingQueryProgress 對象 它有流的最後一個觸發器中取得的progress的全部信息 - 處理了哪些數據,處理率是多少,延遲等等

    streamingQuery.recentProgress 返回最後幾個進度的 array

    streamingQuery.status() 返回一個 StreamingQueryStatus

對象 提供有關的信息立即執行的查詢 - 觸發器是否 active ,數據是否正在處理等

  6.3 異步監控

    在sparkSession上附加一個 StreamingQueryListener.

    一旦你使用 sparkSession.streams.attachListener() 附加你的自定義 StreamingQueryListener 對象,當您啟動查詢和當有活動查詢有進度時停止時,您將收到 callbacks (回調)

  6.4 故障恢復

    如果發生 failure or intentional shutdown (故障或故意關機),您可以恢復之前的查詢的進度和狀態,並繼續停止的位置.(通過 checkpointing and write ahead logs (檢查點和預寫入日誌)完成)

    通過配置 checkpoint location (檢查點位置)查詢,將保存所有進度信息(即,每個觸發器中處理的偏移範圍)和正在運行的 aggregates (聚合) 

    aggDF
      .writeStream
      .outputMode("complete")
      .option("checkpointLocation", "path/to/HDFS/dir")
      .format("memory")
      .start()

  

[Spark]-結構化流之監控&故障恢復篇