Spark Structured Streaming 的 Stateful 操作
阿新 • • 發佈:2020-12-20
Structured Streaming 內部使用 StateStore 模組實現增量持續查詢,和故障恢復
StateStore 模組提供了分片的、分版本的、可遷移的、高可用的 key-value store
而在應用層面主要是使用 mapGroupsWithState 和 flatMapGroupsWithState 實現狀態操作
參考這篇文章的例子 https://blog.csdn.net/wangpei1949/article/details/105028892
object MapGroupsWithStateExample { def main(args: Array[String]) { val spark = SparkSession.builder.appName("MapGroupsWithStateExample").getOrCreate() spark.udf.register("timezoneToTimestamp", timezoneToTimestamp _) val jsonSchema = """{ "type":"struct", "fields":[ { "name":"eventTime", "type":"string", "nullable":true }, { "name":"eventType", "type":"string", "nullable":true }, { "name":"userID", "type":"string", "nullable":true } ] }""" val inputTable = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9092,kafka03:9092") .option("subscribe", "test_1") .load() val resultTable = inputTable .select(from_json(col("value").cast("string"), DataType.fromJson(jsonSchema)).as("value")) .select($"value.*") .withColumn("timestamp", functions.callUDF("timezoneToTimestamp", functions.col("eventTime"), lit("yyyy-MM-dd HH:mm:ss"), lit("GMT+8"))) .filter($"timestamp".isNotNull && $"eventType".isNotNull && $"userID".isNotNull) .withWatermark("timestamp", "2 minutes") .groupByKey((row: Row) => { // 分鐘 + userID 作為每個 group 的 key val timestamp = row.getAs[Timestamp]("timestamp") val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm") val currentEventTimeMinute = sdf.format(new Date(timestamp.getTime)) currentEventTimeMinute + "," + row.getAs[String]("userID") }) .mapGroupsWithState[(String, Long), (String, String, Long)](GroupStateTimeout.EventTimeTimeout())( // [] 裡的 (String, Long) 和 (String, String, Long) 分別代表狀態型別和返回型別 // () 裡的 GroupStateTimeout.EventTimeTimeout() 指定如何判斷狀態超時 // 這裡開始是 group map 的函式 // 接受 3 個引數: 當前 group 的 key,當前 group 的所有資料,處理這個 group 時的狀態 // 每個 (分鐘 + userID) 會維護一個狀態 (groupKey: String, currentBatchRows: Iterator[Row], groupState: GroupState[(String, Long)]) => { println("當前組對應的 Key: " + groupKey) println("當前 Watermark: " + groupState.getCurrentWatermarkMs()) println("當前組的狀態是否存在: " + groupState.exists) println("當前組的狀態是否過期: " + groupState.hasTimedOut) var totalValue = 0L if (groupState.hasTimedOut) { // 當前組狀態已過期,則清除狀態 println("清除狀態...") groupState.remove() } else if (groupState.exists) { // 當前組狀態已存在,則根據需要處理 println("增量聚合....") // 歷史值: 從狀態中獲取 val historyValue = groupState.get._2 // 當前值: 從當前組的新資料計算得到 val currentValue = currentBatchRows.size // 總值 = 歷史 + 當前 totalValue = historyValue + currentValue // 更新狀態 val newState = (groupKey, totalValue) groupState.update(newState) // 事件時間模式下,不需要設定超時時間,會根據 Watermark 機制自動超時 // 處理時間模式下,可設定個超時時間,根據超時時間清理狀態,避免狀態無限增加 // groupState.setTimeoutDuration(1 * 10 * 1000) } else { // 當前組狀態不存在,則初始化狀態 println("初始化狀態...") totalValue = currentBatchRows.size val initialState = (groupKey, totalValue * 1L) groupState.update(initialState) } if (totalValue != 0) { val groupKeyArray = groupKey.split(",") (groupKeyArray(0), groupKeyArray(1), totalValue) } else { null } } ) .filter(_ != null) .toDF("minute", "userID", "pv") // Query Start val query = resultTable .writeStream .format("console") .option("truncate", "false") .outputMode("update") .trigger(Trigger.ProcessingTime("2 seconds")) .start() query.awaitTermination() } def timezoneToTimestamp(dateTime: String, dataTimeFormat: String, dataTimeZone: String): Timestamp = { var output: Timestamp = null try { if (dateTime != null) { val format = DateTimeFormatter.ofPattern(dataTimeFormat) val eventTime = LocalDateTime.parse(dateTime, format).atZone(ZoneId.of(dataTimeZone)); output = new Timestamp(eventTime.toInstant.toEpochMilli) } } catch { case ex: Exception => println("error") } output } }