大資料Spark實時處理--結構化流1(Structured Streaming)
阿新 • • 發佈:2022-03-22
Spark Streaming的不足
- 1)基於ProcessingTime
- 在資料處理過程中,是有幾個時間的:
- ProcessingTime vs EventTime
- 12:00:00 資料的真正產生時間 :EventTime
- 12:01:10 進入Spark的時間 :ProcessingTime
- 2)Complex API
- DStream RDD
- 一個業務交給不同的開發人員去實現,可能最終的效能千差萬別
- 3)批流程式碼不統一
- 4)end to end 端到端
Structured Streaming的概述
- 1)Structured Streaming Programming Guide - Spark 3.2.0 Documentation (apache.org)
- 2)結構化流:構建在Spark SQL引擎之上的,可擴充套件、可容錯的流處理引擎
- 3)Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.
- 4)In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
- 5)processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees.
第一個Structured Streaming應用程式
- 1)左擊專案----->new----->Module----->Maven----->next
- Artifactld:log-sss
- Module name:log-sss
- 2)導spark-sql、spark-core、spark-streaming的依賴
- 3)log-sss------src------main-------new directory:scala
- 4)右擊scala-------Mark Directory as-------Sources Root
- 5)file-----project structure-----project settings------modules-----log-sss------add-----scala------ok
- 6)右擊scala------new package------com.imooc.spark.sss
- 7)右擊com.imooc.spark.sss-------new------scala class------wc-----object
- 8)報錯,先暫停
package com.imooc.spark.sss import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession /* 基於spark sql */ object WordCount { //第1步:main def main(args: Array[String]): Unit = { //第2步:拿到SparkSession---builder---設定master // ----設定app的名字---.getOrCreate()----var-----拿到session,改名為spark //這裡是入口 val spark = SparkSession .builder .master("local[2]") .appName(this.getClass.getName) //.appName("StructuredNetworkWordCount") .getOrCreate() import spark.implicits._ //第4步: 對接socket資料 //4.1 拿到spark,並讀流資料----使用socket的方式-----這裡是資料來源 //spark.readStream.format("socket") //4.2 資料是來源的機器 //4.3 資料load進來-----.load().var---改為lines val lines = spark.readStream .format("socket") .option("host", "spark000") .option("port", 9999) .load() //第6步: val words = lines.as[String].flatMap(_.split(" ")) .groupBy("value") .count() //第5步: 將上述lines展示出來 //5.1 將lines寫出去----寫到控制檯---開始----終止 val query = words.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() // //第3步:關閉----框架ok----因為是雲平臺,不能停止 // spark.stop() } }
Structured Streaming程式設計模型
- 核心思想:結構化流的關鍵思想是將實時資料流視為一個不斷追加的表。它與批處理模型非常相似。無界表上的增量輸入。
- Complete mode
- Append mode
- Update mode
處理EventTime和延遲資料
-
EventTime是嵌入資料本身的時間。對於許多應用程式,希望是基於EventTime進行操作。
-
例如,如果希望獲得每分鐘由物聯網裝置生成的Event數量,那麼可能希望使用生成資料的時間EventTime(即資料中的EventTime),而不是Spark接收資料的時間。這個EventTime在這個模型中非常自然地表示出來——來自裝置的每個事件都是表中的一行,而EventTime是行中的一列值。這允許基於視窗的聚合(例如,每分鐘的事件數)只是事件時間列上的一種特殊型別的分組和聚合——每個時間視窗是一個組,每一行可以屬於多個視窗/組。因此,可以在靜態資料集(例如,從收集的裝置事件日誌)以及資料流上一致地定義這種基於事件時間視窗的聚合查詢,從而使使用者的生活更加輕鬆
- 此外,結構化流模型自然會根據EventTime處理比預期晚到的資料。由於Spark可以更新結果表,因此它可以完全控制在有延遲資料時更新舊聚合,以及清理舊聚合以限制中間狀態資料的大小。從Spark 2.1開始,我們就支援水印,它允許使用者指定延遲資料的閾值,並允許引擎相應地清除舊狀態。稍後將在“視窗操作”一節中詳細解釋這些操作。
使用SQL完成統計分析
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\WCApp.scala
- 目前報錯
package com.imooc.spark.sss import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode object WCApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate() import spark.implicits._ val lines = spark.readStream.format("socket") .option("host", "spark000") .option("port", 9999) .load() val wordCount = lines.as[String] .flatMap(_.split(",")) .createOrReplaceTempView("wc") spark.sql( """ |select |value, count(1) as cnt |from |wc |group by value """.stripMargin) .writeStream .outputMode(OutputMode.Complete()) .format("console") .start() .awaitTermination() } }
對接csv資料來源資料
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\data\csv\emp.csv
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
- 報錯同上
package com.imooc.spark.sss import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.functions.window object SourceApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate() //呼叫方法1---readCsvPartition readCsv(spark) } //方法1---readCsvPartition def readCsv(spark:SparkSession): Unit = { val userSchema = new StructType() .add("id", IntegerType) .add("name",StringType) .add("city", StringType) spark.readStream .format("csv") .schema(userSchema) .load("log-sss/data/csv") .groupBy("city") .count() .writeStream .outputMode(OutputMode.Complete()) .format("console") .start() .awaitTermination() }
對接分割槽資料來源資料
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\data\partition
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
- 報錯同上
package com.imooc.spark.sss import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.functions.window object SourceApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate() //呼叫方法2---readCsvPartition readCsvPartition(spark) }//方法2---readCsvPartition def readCsvPartition(spark:SparkSession): Unit = { val userSchema = new StructType() .add("id", IntegerType) .add("name",StringType) .add("city", StringType) spark.readStream .format("csv") .schema(userSchema) .load("log-sss/data/partition") .writeStream .format("console") .start() .awaitTermination() }
對接Kafka資料來源資料
- 啟動dfs、yarn、zookeeper、多broker的kafka
- 建立topic
[hadoop@spark000 bin]$ pwd /home/hadoop/app/kafka_2.12-2.5.0/bin [hadoop@spark000 bin]$ ./kafka-topics.sh --create --zookeeper spark000:2181 --replication-factor 1 --partitions 1 --topic ssskafkatopic Created topic ssskafkatopic.
- 啟動producer
- 在IDEA執行後,在此處輸入資料,在控制檯展示結果
[hadoop@spark000 bin]$ pwd /home/hadoop/app/kafka_2.12-2.5.0/bin [hadoop@spark000 bin]$ ./kafka-console-producer.sh --broker-list spark000:9092 --topic ssskafkatopic
- IDEA新增依賴
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\pom.xml
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> </dependency>
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
- 報錯同上
package com.imooc.spark.sss import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.functions.window object SourceApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate()//呼叫方法3---kafkaSource kafkaSource(spark) }//方法3---kafkaSource def kafkaSource(spark:SparkSession): Unit = { import spark.implicits._ spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "spark000:9092") .option("subscribe", "ssskafkatopic") .load() .selectExpr("CAST(value AS STRING)") .as[String].flatMap(_.split(",")) .groupBy("value").count() .writeStream .format("console") .outputMode(OutputMode.Update()) .start() .awaitTermination() } }