1. 程式人生 > 實用技巧 >Spark Structured Streaming 的 Stateful 操作

Spark Structured Streaming 的 Stateful 操作

Structured Streaming 內部使用 StateStore 模組實現增量持續查詢,和故障恢復
StateStore 模組提供了分片的、分版本的、可遷移的、高可用的 key-value store

而在應用層面主要是使用 mapGroupsWithState 和 flatMapGroupsWithState 實現狀態操作

參考這篇文章的例子 https://blog.csdn.net/wangpei1949/article/details/105028892

object MapGroupsWithStateExample {

  def main(args: Array[String]) {

    val spark = SparkSession.builder.appName("MapGroupsWithStateExample").getOrCreate()

    spark.udf.register("timezoneToTimestamp", timezoneToTimestamp _)

    val jsonSchema =
      """{
        "type":"struct",
        "fields":[
          {
            "name":"eventTime",
            "type":"string",
            "nullable":true
          },
          {
            "name":"eventType",
            "type":"string",
            "nullable":true
          },
          {
            "name":"userID",
            "type":"string",
            "nullable":true
          }
        ]
      }"""

    val inputTable = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9092,kafka03:9092")
        .option("subscribe", "test_1")
        .load()

    val resultTable = inputTable
        .select(from_json(col("value").cast("string"), DataType.fromJson(jsonSchema)).as("value"))
        .select($"value.*")
        .withColumn("timestamp",
                    functions.callUDF("timezoneToTimestamp",
                                      functions.col("eventTime"),
                                      lit("yyyy-MM-dd HH:mm:ss"),
                                      lit("GMT+8")))
        .filter($"timestamp".isNotNull && $"eventType".isNotNull && $"userID".isNotNull)
        .withWatermark("timestamp", "2 minutes")
        .groupByKey((row: Row) => {
            // 分鐘 + userID 作為每個 group 的 key
            val timestamp = row.getAs[Timestamp]("timestamp")
            val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
            val currentEventTimeMinute = sdf.format(new Date(timestamp.getTime))
            currentEventTimeMinute + "," + row.getAs[String]("userID")
        })
        .mapGroupsWithState[(String, Long), (String, String, Long)](GroupStateTimeout.EventTimeTimeout())(
            // [] 裡的 (String, Long) 和 (String, String, Long) 分別代表狀態型別和返回型別
            // () 裡的 GroupStateTimeout.EventTimeTimeout() 指定如何判斷狀態超時

            // 這裡開始是 group map 的函式
            // 接受 3 個引數: 當前 group 的 key,當前 group 的所有資料,處理這個 group 時的狀態
            //                每個 (分鐘 + userID) 會維護一個狀態
           (groupKey: String, currentBatchRows: Iterator[Row], groupState: GroupState[(String, Long)]) => {
                println("當前組對應的 Key: " + groupKey)
                println("當前 Watermark: " + groupState.getCurrentWatermarkMs())
                println("當前組的狀態是否存在: " + groupState.exists)
                println("當前組的狀態是否過期: " + groupState.hasTimedOut)

                var totalValue = 0L

                if (groupState.hasTimedOut) {
                    // 當前組狀態已過期,則清除狀態
                    println("清除狀態...")

                    groupState.remove()

                } else if (groupState.exists) {
                    // 當前組狀態已存在,則根據需要處理
                    println("增量聚合....")

                    // 歷史值: 從狀態中獲取
                    val historyValue = groupState.get._2

                    // 當前值: 從當前組的新資料計算得到
                    val currentValue = currentBatchRows.size

                    // 總值 = 歷史 + 當前
                    totalValue = historyValue + currentValue

                    // 更新狀態
                    val newState = (groupKey, totalValue)
                    groupState.update(newState)

                    // 事件時間模式下,不需要設定超時時間,會根據 Watermark 機制自動超時
                    // 處理時間模式下,可設定個超時時間,根據超時時間清理狀態,避免狀態無限增加
                    // groupState.setTimeoutDuration(1 * 10 * 1000)
                } else {
                    // 當前組狀態不存在,則初始化狀態
                    println("初始化狀態...")

                    totalValue = currentBatchRows.size
                    val initialState = (groupKey, totalValue * 1L)
                    groupState.update(initialState)
                }

                if (totalValue != 0) {
                    val groupKeyArray = groupKey.split(",")
                    (groupKeyArray(0), groupKeyArray(1), totalValue)
                } else {
                    null
                }
            }
        )
        .filter(_ != null)
        .toDF("minute", "userID", "pv")

    // Query Start
    val query = resultTable
        .writeStream
        .format("console")
        .option("truncate", "false")
        .outputMode("update")
        .trigger(Trigger.ProcessingTime("2 seconds"))
        .start()

    query.awaitTermination()
  }

  def timezoneToTimestamp(dateTime: String, dataTimeFormat: String, dataTimeZone: String): Timestamp = {
    var output: Timestamp = null
    try {
      if (dateTime != null) {
        val format = DateTimeFormatter.ofPattern(dataTimeFormat)
        val eventTime = LocalDateTime.parse(dateTime, format).atZone(ZoneId.of(dataTimeZone));
        output = new Timestamp(eventTime.toInstant.toEpochMilli)
      }
    } catch {
      case ex: Exception => println("error")
    }
    output
  }
}