1. 程式人生 > >2,StructuredStreaming的事件時間和窗口操作

2,StructuredStreaming的事件時間和窗口操作

struct tps cdr sta lin apache mode second fmt

推薦閱讀:1,StructuredStreaming簡介

使用Structured Streaming基於事件時間的滑動窗口的聚合操作是很簡單的,很像分組聚合。在一個分組聚合操作中,聚合值被唯一保存在用戶指定的列中。在基於窗口的聚合的情況下,對於行的事件時間的每個窗口,維護聚合值。

如前面的例子,我們運行wordcount操作,希望以10min窗口計算,每五分鐘滑動一次窗口。也即,12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 這些十分鐘窗口中進行單詞統計。12:00 - 12:10意思是在12:00之後到達12:10之前到達的數據,比如一個單詞在12:07收到。這個單詞會影響12:00 - 12:10, 12:05 - 12:15兩個窗口。

結果表將如下所示。

技術分享圖片

import org.apache.spark.sql.streaming.Trigger
import java.sql.Timestamp
import org.apache.spark.sql.functions._
import spark.implicits._

val lines=spark.readStream.format("socket").option("host", "127.0.0.1").option("port", 9999).option("includeTimestamp", true).load()

val words=lines.as[(String, Timestamp)].flatMap(line=>line._1.split(" ").map(word=> (word, line._2))).toDF("word", "timestamp")
val windowedCounts=words.withWatermark("timestamp", "30 seconds").groupBy(window($"timestamp", "30 seconds", "15 seconds"), $"word").count()
val query=windowedCounts.writeStream.outputMode("Append").format("console").trigger(Trigger.ProcessingTime(5000)).option("truncate", "false").start()
query.awaitTermination()

推薦閱讀:

Spark Structured Streaming高級特性

Spark Streaming 中管理 Kafka Offsets 的幾種方式

2,StructuredStreaming的事件時間和窗口操作