大資料Spark實時處理--結構化流2(Structured Streaming)
阿新 • • 發佈:2022-03-22
基於EventTime的視窗統計原理詳解
- 10分鐘一個視窗,5分鐘更新一次
- 從12:00開始計算,隱藏之前的視窗
- 更新時間分別是:12:05、12:10、12:15、12:20
- 視窗(左閉右開區間)):12:00--12:10、12:05--12:15、12:10--12:20、12:15--12:25
- 更新時間一:12:05,展示12:00--12:10中的值累加
- 更新時間二:12:10,展示12:00--12:10中的值累加、12:05--12:15中的值累加
- 更新時間三:12:15,展示12:00--12:10中的值累加、12:05--12:15中的值累加、12:10--12:20中的值累加
- 更新時間四:12:20,展示展示12:00--12:10中的值累加、12:05--12:15中的值累加、12:10--12:20中的值累加、12:15--12:25中的值累加
- 總結:一條資料被分到多個視窗中
基於EventTime的視窗統計功能實現
- 開啟dfs、yarn、zookeeper、多broker的kafka服務、master,9999埠
- 執行IDEA
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
- 報錯同上
package com.imooc.spark.sss import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.functions.windowobject SourceApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate()//呼叫方法4---eventTimeWindow eventTimeWindow(spark) }//方法4---eventTimeWindow def eventTimeWindow(spark:SparkSession): Unit = { import spark.implicits._ spark.readStream.format("socket") .option("host","spark000") .option("port",9999) .load.as[String] .map(x => { val splits = x.split(",") (splits(0),splits(1)) }).toDF("ts", "word") .groupBy( window($"ts", "10 minutes", "5 minutes"), $"word" ).count() .sort("window") .writeStream .format("console") .option("truncate","false") .outputMode(OutputMode.Complete()) .start() .awaitTermination() } }
- 在9999埠處,輸入資料
2021-10-01 12:02:00,cat 2021-10-01 12:02:00,dog 2021-10-01 12:03:00,dog 2021-10-01 12:03:00,dog 2021-10-01 12:07:00,owl 2021-10-01 12:07:00,cat 2021-10-01 12:11:00,dog 2021-10-01 12:13:00,owl
- 結果
- 時間一:12:00---[11:55---12:05)的值累加
- 時間二:12:05---[12:00---12:10)的值累加
- 時間三:12:10---[12:05---12:15)的值累加
- 時間四:12:15---[12:10---12:20)的值累加
延遲資料處理及Watermark
- 資料的亂序及延遲到達
- 基於上述9999的資料輸入,將2021-10-01 12:11:00,dog的資料,改為,2021-10-01 12:14:00,dog的資料輸入,但接收資料的時間為2021-10-01 12:11:00。
- 此時應用程式應使用2021-10-01 12:04:00的時間。
- 所以要更新12:00到12:10視窗中的值。
- 原理:結構化流可以在很長一段時間內保持部分聚合的中間狀態,以便後期資料可以正確更新舊視窗的聚合。
- Watermark即閾值
- max event time seen by the engine - late threshold > T
FIle Sink
- 所有服務啟動、9999埠
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SinkApp.scala
package com.imooc.spark.sss import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.sql.{ForeachWriter, Row, SparkSession} import org.apache.spark.sql.functions.window import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} object SinkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate() fileSink(spark) } def fileSink(spark:SparkSession): Unit = { import spark.implicits._ spark.readStream .format("socket") .option("host","spark000") .option("port",9999) .load().as[String] .flatMap(_.split(",")) .map(x => (x,"pk")) .toDF("word","new_word") .writeStream .format("json") .option("path","out") .option("checkpointLocation","chk") .start() .awaitTermination() } }
Kafka Sink
- 關閉上一節的生產者,啟動一個消費者
- 在此處接收資料
[hadoop@spark000 bin]$ pwd /home/hadoop/app/kafka_2.12-2.5.0/bin [hadoop@spark000 bin]$ ./kafka-console-producer.sh --broker-list spark000:9092 --topic ssskafkatopic >^C[hadoop@spark000 bin]$ [hadoop@spark000 bin]$ [hadoop@spark000 bin]$ ./kafka-console-consumer.sh --bootstrap-server spark000:9092 --topic ssskafkatopic
- 啟動9999埠
- 在9999埠輸入測試資料
- IDEA程式碼
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SinkApp.scala
package com.imooc.spark.sss import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.sql.{ForeachWriter, Row, SparkSession} import org.apache.spark.sql.functions.window import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} object SinkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate() kafkaSink(spark) } def kafkaSink(spark:SparkSession): Unit = { import spark.implicits._ spark.readStream.format("socket") .option("host","spark000") .option("port",9999) .load().as[String] .writeStream .format("kafka") .option("kafka.bootstrap.servers", "spark000:9092") .option("topic", "ssskafkatopic") .option("checkpointLocation","kafka-chk") .start() .awaitTermination() } }
ForeachSink到MySQL
- 開啟所有服務、9999埠
- 在9999埠處,輸入資料
- MySQL建立table
[hadoop@spark000 ~]$ mysql -uroot -proot mysql> show databases; mysql> use jieiong; Database changed mysql> create table t_wc( -> word varchar(20) not null, -> cnt int not null, -> primary key (word) -> ); Query OK, 0 rows affected (0.01 sec) mysql> select * from t_wc; Empty set (0.01 sec)
- 新增依賴
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency>
- IDEA
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SinkApp.scala
package com.imooc.spark.sss import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.sql.{ForeachWriter, Row, SparkSession} import org.apache.spark.sql.functions.window import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} object SinkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate() mysqlSink(spark) } def mysqlSink(spark:SparkSession): Unit = { import spark.implicits._ spark.readStream .format("socket") .option("host","spark000") .option("port",9999) .load().as[String] .flatMap(_.split(",")) .groupBy("value") .count() .repartition(2) .writeStream .outputMode(OutputMode.Update()) .foreach(new ForeachWriter[Row] { var connection:Connection = _ var pstmt:PreparedStatement = _ var batchCount = 0 override def process(value: Row): Unit = { println("處理資料...") val word = value.getString(0) val cnt = value.getLong(1).toInt println(s"word:$word, cnt:$cnt...") pstmt.setString(1, word) pstmt.setInt(2, cnt) pstmt.setString(3, word) pstmt.setInt(4, cnt) pstmt.addBatch() batchCount += 1 if(batchCount >= 10) { pstmt.executeBatch() batchCount = 0 } } override def close(errorOrNull: Throwable): Unit = { println("關閉...") pstmt.executeBatch() batchCount = 0 connection.close() } override def open(partitionId: Long, epochId: Long): Boolean = { println(s"開啟connection: $partitionId, $epochId") Class.forName("com.mysql.jdbc.Driver") connection = DriverManager.getConnection("jdbc:mysql://spark000:3306/jieqiong","root","root") val sql = """ |insert into t_wc(word,cnt) |values(?,?) |on duplicate key update word=?,cnt=?; | """.stripMargin pstmt = connection.prepareStatement(sql) connection!=null && !connection.isClosed && pstmt != null } }) .start() .awaitTermination() } }
容錯語義
- 結構化流可以確保在任何故障情況下端到端只執行一次語義。