1. 程式人生 > 其它 >Spark Streaming、離線計算、實時計算、實時查詢、Spark Streaming 原理、Spark Streaming WordCount、Spark Streaming 架構圖

Spark Streaming、離線計算、實時計算、實時查詢、Spark Streaming 原理、Spark Streaming WordCount、Spark Streaming 架構圖

Spark Streaming、離線計算、實時計算、實時查詢、Spark Streaming 原理、Spark Streaming WordCount、Spark Streaming 架構圖

目錄

Spark Streaming

spark 中 最重要的就是 spark core 和 spark sql (也就是之前筆記的內容)

離線計算、實時計算、實時查詢

Spark Streaming 原理

Spark Streaming WordCount

1、匯入依賴

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

以下列出 spark 專案 目前所需的所有依賴和外掛

    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.40</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

2、WordCount 示例

package com.shujia.stream

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object Demo1WordCount {
  def main(args: Array[String]): Unit = {

    /**
      * 1、建立 SparkContext
      *
      */
    val conf: SparkConf = new SparkConf()
      //在local後面可以通過[]指定一個引數,指定一個任務執行時使用的CPU核數,不能超過自己電腦的CPU核數
      //因為在處理資料的時候還要接收資料,所以這裡要指定為 2 才能跑起來
      .setMaster("local[2]")
      .setAppName("stream")

    val sc = new SparkContext(conf)

    /**
      * 2、建立 SparkStreaming 環境
      *
      * 需要SparkContext物件+指定batch的間隔時間(多久處理一次)
      *
      */

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    /**
      * 實時計算的資料來源一般是 Kafka
      * 因為目前 Kafka 沒有學,所以通過 socket 去模擬實時環境
      *
      * 讀取socket中的資料 -- socketTextStream("主機名","埠號")
      *
      * 在Linux中啟動 socket 服務
      * 沒有nc命令的話,要yum一個
      * yum install nc
      * nc -lk 8888
      * 之後就可以在Linux的shell中輸入資料,然後誰連線了我指定的埠號,誰就能拿到我輸入的資料
      *
      */

    /**
      * DStream : Spark Streaming 的程式設計模型,其底層也是RDD,每隔5秒將資料封裝成一個rdd
      */

    val lineDS: DStream[String] = ssc.socketTextStream("master", 8888)

    /**
      * 統計單詞的數量
      *
      */

    val wordsDS: DStream[String] = lineDS.flatMap(line => line.split(","))

    val kvDS: DStream[(String, Int)] = wordsDS.map(word => (word, 1))

    val countDS: DStream[(String, Int)] = kvDS.reduceByKey(_ + _)

    countDS.print()

    /**
      * 啟動Spark Streaming程式
      * 這幾行程式碼是必須的,這和RDD中不一樣
      */

    ssc.start()//啟動
    ssc.awaitTermination()//等待關閉
    ssc.stop()//關閉

  }
}

Spark Streaming 架構圖