2,StructuredStreaming的事件時間和窗口操作
推薦閱讀:1,StructuredStreaming簡介
使用Structured Streaming基於事件時間的滑動窗口的聚合操作是很簡單的,很像分組聚合。在一個分組聚合操作中,聚合值被唯一保存在用戶指定的列中。在基於窗口的聚合的情況下,對於行的事件時間的每個窗口,維護聚合值。
如前面的例子,我們運行wordcount操作,希望以10min窗口計算,每五分鐘滑動一次窗口。也即,12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 這些十分鐘窗口中進行單詞統計。12:00 - 12:10意思是在12:00之後到達12:10之前到達的數據,比如一個單詞在12:07收到。這個單詞會影響12:00 - 12:10, 12:05 - 12:15兩個窗口。
結果表將如下所示。
import org.apache.spark.sql.streaming.Trigger
import
java.sql.Timestamp
import
org.apache.spark.sql.functions._
import
spark.implicits._
val
lines=spark.readStream.format("socket").option("host", "127.0.0.1").option("port", 9999).option("includeTimestamp", true).load()
val
words=lines.as[(String, Timestamp)].flatMap(line=>line._1.split(" ").map(word=>
(word,
line._2))).toDF("word", "timestamp")
val
windowedCounts=words.withWatermark("timestamp", "30
seconds").groupBy(window($"timestamp", "30
seconds", "15
seconds"), $"word").count()
val
query=windowedCounts.writeStream.outputMode("Append").format("console").trigger(Trigger.ProcessingTime(5000)).option("truncate", "false").start()
query.awaitTermination()
推薦閱讀:
Spark Structured Streaming高級特性
Spark Streaming 中管理 Kafka Offsets 的幾種方式
2,StructuredStreaming的事件時間和窗口操作