1. 程式人生 > 其它 >大資料Spark實時處理--結構化流2(Structured Streaming)

大資料Spark實時處理--結構化流2(Structured Streaming)

基於EventTime的視窗統計原理詳解

  • 10分鐘一個視窗,5分鐘更新一次
  • 從12:00開始計算,隱藏之前的視窗
  • 更新時間分別是:12:05、12:10、12:15、12:20
  • 視窗(左閉右開區間)):12:00--12:10、12:05--12:15、12:10--12:20、12:15--12:25
  • 更新時間一:12:05,展示12:00--12:10中的值累加
  • 更新時間二:12:10,展示12:00--12:10中的值累加、12:05--12:15中的值累加
  • 更新時間三:12:15,展示12:00--12:10中的值累加、12:05--12:15中的值累加、12:10--12:20中的值累加
  • 更新時間四:12:20,展示展示12:00--12:10中的值累加、12:05--12:15中的值累加、12:10--12:20中的值累加、12:15--12:25中的值累加
  • 總結:一條資料被分到多個視窗中

 

基於EventTime的視窗統計功能實現

  • 開啟dfs、yarn、zookeeper、多broker的kafka服務、master,9999埠
  • 執行IDEA
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
  • 報錯同上
package com.imooc.spark.sss

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.functions.window

object SourceApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate()//呼叫方法4---eventTimeWindow eventTimeWindow(spark) }//方法4---eventTimeWindow def eventTimeWindow(spark:SparkSession): Unit = { import spark.implicits._ spark.readStream.format(
"socket") .option("host","spark000") .option("port",9999) .load.as[String] .map(x => { val splits = x.split(",") (splits(0),splits(1)) }).toDF("ts", "word") .groupBy( window($"ts", "10 minutes", "5 minutes"), $"word" ).count() .sort("window") .writeStream .format("console") .option("truncate","false") .outputMode(OutputMode.Complete()) .start() .awaitTermination() } }
  • 在9999埠處,輸入資料
2021-10-01 12:02:00,cat
2021-10-01 12:02:00,dog
2021-10-01 12:03:00,dog
2021-10-01 12:03:00,dog
2021-10-01 12:07:00,owl
2021-10-01 12:07:00,cat
2021-10-01 12:11:00,dog
2021-10-01 12:13:00,owl
  • 結果
  • 時間一:12:00---[11:55---12:05)的值累加
  • 時間二:12:05---[12:00---12:10)的值累加
  • 時間三:12:10---[12:05---12:15)的值累加
  • 時間四:12:15---[12:10---12:20)的值累加

 

延遲資料處理及Watermark

  • 資料的亂序及延遲到達
  • 基於上述9999的資料輸入,將2021-10-01 12:11:00,dog的資料,改為,2021-10-01 12:14:00,dog的資料輸入,但接收資料的時間為2021-10-01 12:11:00。
  • 此時應用程式應使用2021-10-01 12:04:00的時間。
  • 所以要更新12:00到12:10視窗中的值。
  • 原理:結構化流可以在很長一段時間內保持部分聚合的中間狀態,以便後期資料可以正確更新舊視窗的聚合。
  • Watermark即閾值
  • max event time seen by the engine - late threshold > T

 

FIle Sink

  • 所有服務啟動、9999埠
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SinkApp.scala

 

package com.imooc.spark.sss

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.window
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

object SinkApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()

    fileSink(spark)
  }

  def fileSink(spark:SparkSession): Unit = {
    import spark.implicits._
    spark.readStream
      .format("socket")
      .option("host","spark000")
      .option("port",9999)
      .load().as[String]
      .flatMap(_.split(","))
      .map(x => (x,"pk"))
      .toDF("word","new_word")
      .writeStream
      .format("json")
      .option("path","out")
      .option("checkpointLocation","chk")
      .start()
      .awaitTermination()
  }
}

 

Kafka Sink

  •  關閉上一節的生產者,啟動一個消費者
  • 在此處接收資料
[hadoop@spark000 bin]$ pwd
/home/hadoop/app/kafka_2.12-2.5.0/bin
[hadoop@spark000 bin]$ ./kafka-console-producer.sh --broker-list spark000:9092 --topic ssskafkatopic
>^C[hadoop@spark000 bin]$ 
[hadoop@spark000 bin]$ 
[hadoop@spark000 bin]$ ./kafka-console-consumer.sh --bootstrap-server spark000:9092 --topic ssskafkatopic
  • 啟動9999埠
  • 在9999埠輸入測試資料
  • IDEA程式碼
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SinkApp.scala
package com.imooc.spark.sss

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.window
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

object SinkApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()

    kafkaSink(spark)
  }

  def kafkaSink(spark:SparkSession): Unit = {
    import spark.implicits._
    spark.readStream.format("socket")
      .option("host","spark000")
      .option("port",9999)
      .load().as[String]
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "spark000:9092")
      .option("topic", "ssskafkatopic")
      .option("checkpointLocation","kafka-chk")
      .start()
      .awaitTermination()
  }
}

 

ForeachSink到MySQL

  • 開啟所有服務、9999埠
  • 在9999埠處,輸入資料
  • MySQL建立table
[hadoop@spark000 ~]$ mysql -uroot -proot
mysql> show databases;
mysql> use jieiong;
Database changed
mysql> create table t_wc(
    -> word varchar(20) not null,
    -> cnt int not null,
    -> primary key (word)
    -> );
Query OK, 0 rows affected (0.01 sec)

mysql> select * from t_wc;
Empty set (0.01 sec)
  • 新增依賴
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
  • IDEA
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SinkApp.scala
package com.imooc.spark.sss

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.window
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

object SinkApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()

    mysqlSink(spark)
  }

  def mysqlSink(spark:SparkSession): Unit = {
    import spark.implicits._
    spark.readStream
      .format("socket")
      .option("host","spark000")
      .option("port",9999)
      .load().as[String]
      .flatMap(_.split(","))
      .groupBy("value")
      .count()
      .repartition(2)
      .writeStream
      .outputMode(OutputMode.Update())
      .foreach(new ForeachWriter[Row] {
        var connection:Connection = _
        var pstmt:PreparedStatement = _
        var batchCount = 0

        override def process(value: Row): Unit = {
          println("處理資料...")

          val word = value.getString(0)
          val cnt = value.getLong(1).toInt

          println(s"word:$word, cnt:$cnt...")

          pstmt.setString(1, word)
          pstmt.setInt(2, cnt)
          pstmt.setString(3, word)
          pstmt.setInt(4, cnt)
          pstmt.addBatch()

          batchCount += 1
          if(batchCount >= 10) {
            pstmt.executeBatch()
            batchCount = 0
          }

        }

        override def close(errorOrNull: Throwable): Unit = {
          println("關閉...")
          pstmt.executeBatch()
          batchCount = 0
          connection.close()
        }

        override def open(partitionId: Long, epochId: Long): Boolean = {
          println(s"開啟connection: $partitionId, $epochId")
          Class.forName("com.mysql.jdbc.Driver")
          connection = DriverManager.getConnection("jdbc:mysql://spark000:3306/jieqiong","root","root")

          val sql =
            """
              |insert into t_wc(word,cnt)
              |values(?,?)
              |on duplicate key update word=?,cnt=?;
              |
              """.stripMargin

          pstmt = connection.prepareStatement(sql)

          connection!=null && !connection.isClosed && pstmt != null
        }
      })

      .start()
      .awaitTermination()
  }
}

 

容錯語義

  • 結構化流可以確保在任何故障情況下端到端只執行一次語義。