Spark Streaming、離線計算、實時計算、實時查詢、Spark Streaming 原理、Spark Streaming WordCount、Spark Streaming 架構圖
阿新 • • 發佈:2022-03-14
Spark Streaming、離線計算、實時計算、實時查詢、Spark Streaming 原理、Spark Streaming WordCount、Spark Streaming 架構圖
目錄
- Spark Streaming
- 離線計算、實時計算、實時查詢
- Spark Streaming 原理
- Spark Streaming WordCount
- Spark Streaming 架構圖
Spark Streaming
spark 中 最重要的就是 spark core 和 spark sql (也就是之前筆記的內容)
離線計算、實時計算、實時查詢
Spark Streaming 原理
Spark Streaming WordCount
1、匯入依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.5</version>
</dependency>
以下列出 spark 專案 目前所需的所有依賴和外掛
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.40</version> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!-- Scala Compiler --> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
2、WordCount 示例
package com.shujia.stream
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Demo1WordCount {
def main(args: Array[String]): Unit = {
/**
* 1、建立 SparkContext
*
*/
val conf: SparkConf = new SparkConf()
//在local後面可以通過[]指定一個引數,指定一個任務執行時使用的CPU核數,不能超過自己電腦的CPU核數
//因為在處理資料的時候還要接收資料,所以這裡要指定為 2 才能跑起來
.setMaster("local[2]")
.setAppName("stream")
val sc = new SparkContext(conf)
/**
* 2、建立 SparkStreaming 環境
*
* 需要SparkContext物件+指定batch的間隔時間(多久處理一次)
*
*/
val ssc = new StreamingContext(sc, Durations.seconds(5))
/**
* 實時計算的資料來源一般是 Kafka
* 因為目前 Kafka 沒有學,所以通過 socket 去模擬實時環境
*
* 讀取socket中的資料 -- socketTextStream("主機名","埠號")
*
* 在Linux中啟動 socket 服務
* 沒有nc命令的話,要yum一個
* yum install nc
* nc -lk 8888
* 之後就可以在Linux的shell中輸入資料,然後誰連線了我指定的埠號,誰就能拿到我輸入的資料
*
*/
/**
* DStream : Spark Streaming 的程式設計模型,其底層也是RDD,每隔5秒將資料封裝成一個rdd
*/
val lineDS: DStream[String] = ssc.socketTextStream("master", 8888)
/**
* 統計單詞的數量
*
*/
val wordsDS: DStream[String] = lineDS.flatMap(line => line.split(","))
val kvDS: DStream[(String, Int)] = wordsDS.map(word => (word, 1))
val countDS: DStream[(String, Int)] = kvDS.reduceByKey(_ + _)
countDS.print()
/**
* 啟動Spark Streaming程式
* 這幾行程式碼是必須的,這和RDD中不一樣
*/
ssc.start()//啟動
ssc.awaitTermination()//等待關閉
ssc.stop()//關閉
}
}