1. 程式人生 > 其它 >大資料Spark實時處理--結構化流1(Structured Streaming)

大資料Spark實時處理--結構化流1(Structured Streaming)

Spark Streaming的不足

  • 1)基於ProcessingTime
  • 在資料處理過程中,是有幾個時間的:
  • ProcessingTime  vs   EventTime
  • 12:00:00 資料的真正產生時間   :EventTime
  • 12:01:10 進入Spark的時間        :ProcessingTime
  • 2)Complex API
  • DStream      RDD
  • 一個業務交給不同的開發人員去實現,可能最終的效能千差萬別
  • 3)批流程式碼不統一
  • 4)end to end 端到端

 

Structured Streaming的概述

  • 1)Structured Streaming Programming Guide - Spark 3.2.0 Documentation (apache.org)
  • 2)結構化流:構建在Spark SQL引擎之上的可擴充套件、可容錯的流處理引擎
  • 3)Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. 
  • 4)In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
  • 5)processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. 

 

第一個Structured Streaming應用程式

  • 1)左擊專案----->new----->Module----->Maven----->next
  • Artifactld:log-sss
  • Module name:log-sss
  • 2)導spark-sql、spark-core、spark-streaming的依賴
  • 3)log-sss------src------main-------new directory:scala
  • 4)右擊scala-------Mark Directory as-------Sources Root
  • 5)file-----project structure-----project settings------modules-----log-sss------add-----scala------ok
  • 6)右擊scala------new package------com.imooc.spark.sss
  • 7)右擊com.imooc.spark.sss-------new------scala class------wc-----object
  • 8)報錯,先暫停

 

package com.imooc.spark.sss

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

/*
 基於spark sql
 */
object WordCount {
  //第1步:main
  def main(args: Array[String]): Unit = {
    //第2步:拿到SparkSession---builder---設定master
    // ----設定app的名字---.getOrCreate()----var-----拿到session,改名為spark
    //這裡是入口
    val spark = SparkSession
      .builder
      .master("local[2]")
      .appName(this.getClass.getName)
      //.appName("StructuredNetworkWordCount")
      .getOrCreate()

    import spark.implicits._

    //第4步: 對接socket資料
    //4.1 拿到spark,並讀流資料----使用socket的方式-----這裡是資料來源
    //spark.readStream.format("socket")
    //4.2 資料是來源的機器
    //4.3 資料load進來-----.load().var---改為lines
    val lines = spark.readStream
      .format("socket")
      .option("host", "spark000")
      .option("port", 9999)
      .load()

    //第6步:
    val words = lines.as[String].flatMap(_.split(" "))
      .groupBy("value")
      .count()

    //第5步: 將上述lines展示出來
    //5.1 將lines寫出去----寫到控制檯---開始----終止
    val query = words.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()


//    //第3步:關閉----框架ok----因為是雲平臺,不能停止
//    spark.stop()
  }
}

 

Structured Streaming程式設計模型

  • 核心思想:結構化流的關鍵思想是將實時資料流視為一個不斷追加的表。它與批處理模型非常相似。無界表上的增量輸入。
  • Complete mode
  • Append mode
  • Update mode

 

處理EventTime和延遲資料

  • EventTime是嵌入資料本身的時間。對於許多應用程式,希望是基於EventTime進行操作。

  • 例如,如果希望獲得每分鐘由物聯網裝置生成的Event數量,那麼可能希望使用生成資料的時間EventTime(即資料中的EventTime),而不是Spark接收資料的時間。這個EventTime在這個模型中非常自然地表示出來——來自裝置的每個事件都是表中的一行,而EventTime是行中的一列值。這允許基於視窗的聚合(例如,每分鐘的事件數)只是事件時間列上的一種特殊型別的分組和聚合——每個時間視窗是一個組,每一行可以屬於多個視窗/組。因此,可以在靜態資料集(例如,從收集的裝置事件日誌)以及資料流上一致地定義這種基於事件時間視窗的聚合查詢,從而使使用者的生活更加輕鬆

  •  此外,結構化流模型自然會根據EventTime處理比預期晚到的資料。由於Spark可以更新結果表,因此它可以完全控制在有延遲資料時更新舊聚合,以及清理舊聚合以限制中間狀態資料的大小。從Spark 2.1開始,我們就支援水印,它允許使用者指定延遲資料的閾值,並允許引擎相應地清除舊狀態。稍後將在“視窗操作”一節中詳細解釋這些操作。

 

使用SQL完成統計分析

  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\WCApp.scala
  • 目前報錯

 

package com.imooc.spark.sss

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object WCApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()

    import spark.implicits._
    val lines = spark.readStream.format("socket")
      .option("host", "spark000")
      .option("port", 9999)
      .load()

    val wordCount = lines.as[String]
      .flatMap(_.split(","))
      .createOrReplaceTempView("wc")

    spark.sql(
      """
        |select
        |value, count(1) as cnt
        |from
        |wc
        |group by value
      """.stripMargin)
      .writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .start()
      .awaitTermination()

  }

}

 

 

對接csv資料來源資料

  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\data\csv\emp.csv
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
  • 報錯同上

 

package com.imooc.spark.sss

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.functions.window

object SourceApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()
//呼叫方法1---readCsvPartition
    readCsv(spark)

  }

  //方法1---readCsvPartition
  def readCsv(spark:SparkSession): Unit = {

    val userSchema = new StructType()
      .add("id", IntegerType)
      .add("name",StringType)
      .add("city", StringType)

    spark.readStream
      .format("csv")
      .schema(userSchema)
      .load("log-sss/data/csv")
      .groupBy("city")
      .count()
      .writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .start()
      .awaitTermination()
  }

 

 

對接分割槽資料來源資料

  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\data\partition
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
  • 報錯同上

 

package com.imooc.spark.sss

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.functions.window

object SourceApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()
//呼叫方法2---readCsvPartition
    readCsvPartition(spark)
  }//方法2---readCsvPartition
  def readCsvPartition(spark:SparkSession): Unit = {

    val userSchema = new StructType()
      .add("id", IntegerType)
      .add("name",StringType)
      .add("city", StringType)

    spark.readStream
      .format("csv")
      .schema(userSchema)
      .load("log-sss/data/partition")
      .writeStream
      .format("console")
      .start()
      .awaitTermination()
  }

 

對接Kafka資料來源資料

  • 啟動dfs、yarn、zookeeper、多broker的kafka
  • 建立topic
[hadoop@spark000 bin]$ pwd
/home/hadoop/app/kafka_2.12-2.5.0/bin
[hadoop@spark000 bin]$ ./kafka-topics.sh --create --zookeeper spark000:2181 --replication-factor 1 --partitions 1 --topic ssskafkatopic
Created topic ssskafkatopic.
  • 啟動producer
  • 在IDEA執行後,在此處輸入資料,在控制檯展示結果
[hadoop@spark000 bin]$ pwd
/home/hadoop/app/kafka_2.12-2.5.0/bin
[hadoop@spark000 bin]$ ./kafka-console-producer.sh --broker-list spark000:9092 --topic ssskafkatopic
  • IDEA新增依賴
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\pom.xml
<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
</dependency>
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
  • 報錯同上
package com.imooc.spark.sss

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.functions.window

object SourceApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()//呼叫方法3---kafkaSource
    kafkaSource(spark)

  }//方法3---kafkaSource
  def kafkaSource(spark:SparkSession): Unit = {
    import spark.implicits._
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "spark000:9092")
      .option("subscribe", "ssskafkatopic")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String].flatMap(_.split(","))
      .groupBy("value").count()
      .writeStream
      .format("console")
      .outputMode(OutputMode.Update())
      .start()
      .awaitTermination()
  }
}