Spark2.3(三十四):Spark Structured Streaming之withWaterMark和windows視窗是否可以實現最近一小時統計
阿新 • • 發佈:2018-12-02
WaterMark除了可以限定來遲資料範圍,是否可以實現最近一小時統計?
WaterMark目的用來限定引數計算資料的範圍:比如當前計算資料內max timestamp是12::00,waterMark限定資料分為是60 minutes,那麼如果此時輸入11:00之前的資料就會被捨棄不參與統計,視為來遲範圍超出了60minutes限定範圍。
那麼,是否可以藉助它實現最近一小時的資料統計呢?
程式碼示例:
package com.dx.streaming import java.sql.Timestamp import java.text.SimpleDateFormat importorg.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{Encoders, SparkSession} import org.apache.log4j.{Level, Logger} case class MyEntity(id: String, timestamp: Timestamp, value: Integer) object Main { Logger.getLogger("org.apache.spark").setLevel(Level.WARN); Logger.getLogger("akka").setLevel(Level.ERROR); Logger.getLogger("kafka").setLevel(Level.ERROR); def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate() val lines = spark.readStream.format("socket").option("host", "192.168.0.141").option("port", 19999).load() var sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") import spark.implicits._ lines.as(Encoders.STRING) .map(row => { val fields = row.split(",") MyEntity(fields(0), new Timestamp(sdf.parse(fields(1)).getTime), Integer.valueOf(fields(2))) }) .createOrReplaceTempView("tv_entity") spark.sql("select id,timestamp,value from tv_entity") .withWatermark("timestamp", "60 minutes") .createOrReplaceTempView("tv_entity_watermark") val resultDf = spark.sql( s""" |select id,sum(value) as sum_value |from tv_entity_watermark |group id |""".stripMargin) val query = resultDf.writeStream.format("console").outputMode(OutputMode.Update()).start() query.awaitTermination() query.stop() } }
當通過nc -lk 19999中依次(每組輸入間隔幾秒時間即可)輸入如下資料時:
1,2018-12-01 12:00:01,100 2,2018-12-01 12:00:01,100 1,2018-12-01 12:05:01,100 2,2018-12-01 12:05:01,100 1,2018-12-01 12:15:01,100 2,2018-12-01 12:15:01,100 1,2018-12-01 12:25:01,100 2,2018-12-01 12:25:01,100 1,2018-12-01 12:35:01,100 2,2018-12-01 12:35:01,100 1,2018-12-01 12:45:01,100 2,2018-12-01 12:45:01,100 1,2018-12-01 12:55:01,100 2,2018-12-01 12:55:01,100 1,2018-12-01 13:05:02,100 2,2018-12-01 13:05:02,100 1,2018-12-01 13:15:01,100 2,2018-12-01 13:15:01,100
發現最終統計結果為:
id , sum_value 1 , 900 2 , 900
而不是期望的
id , sum_value 1 , 600 2 , 600
既然是不能限定資料統計範圍是60minutes,是否需要藉助於視窗函式window就可以實現呢?
是否需要藉助於watermark和視窗函式window就可以實現最近1小時資料統計呢?
spark.sql("select id,timestamp,value from tv_entity") .withWatermark("timestamp", "60 minutes") .createOrReplaceTempView("tv_entity_watermark") val resultDf = spark.sql( s""" |select id,sum(value) as sum_value |from tv_entity_watermark |group window(timestamp,'60 minutes','60 minutes'),id |""".stripMargin) val query = resultDf.writeStream.format("console").outputMode(OutputMode.Update()).start()
依然輸入上邊的測試資料,會發現超過1小時候資料會重新開闢(歸零後重新統計)一個統計結果,而不是滾動的一小時統計。
就是把上邊的測試資料分為了兩組來分別統計:
第一組(小時)參與統計資料:
1,2018-12-01 12:00:01,100 2,2018-12-01 12:00:01,100 1,2018-12-01 12:05:01,100 2,2018-12-01 12:05:01,100 1,2018-12-01 12:15:01,100 2,2018-12-01 12:15:01,100 1,2018-12-01 12:25:01,100 2,2018-12-01 12:25:01,100 1,2018-12-01 12:35:01,100 2,2018-12-01 12:35:01,100 1,2018-12-01 12:45:01,100 2,2018-12-01 12:45:01,100 1,2018-12-01 12:55:01,100 2,2018-12-01 12:55:01,100
第二組(小時)參與統計資料:
1,2018-12-01 13:05:02,100 2,2018-12-01 13:05:02,100 1,2018-12-01 13:15:01,100 2,2018-12-01 13:15:01,100
猜測總結:
根據上邊測試結果可以推出一個猜測結論:
在spark structured streaming中是不儲存引數統計的資料的,只是對資料進行了maxTimestamp.avgTimestamp,minTimestamp儲存,同時只是對資料的統計結果進行儲存,下次再次觸發統計時只是在原有的統計結果之上進行累加等操作,而參與統計的資料應該是沒有儲存,否則這類需求應該是可以實現。