1. 程式人生 > 實用技巧 >Spark(十六)【SparkStreaming基本使用】

Spark(十六)【SparkStreaming基本使用】

目錄

一. SparkStreaming簡介

1. 相關術語

流式資料: 指資料來源源不斷。

實時資料: 當前正在產生的資料。

離線資料: 過去(不是當下產生的)已經產生的資料。

實時計算: 理想上,實時計算一定是對實時資料的計算,理想期望立刻當前計算出結果(要在公司規定的時效範圍內)。

離線計算: 計算通常需要劃分一段時間。

總結:離線計算和實時計算主要通過計算的時效性進行區分,實時在不同的公司,有相對參考的標準。

2. SparkStreaming概念

SparkStreaming可以用來進行實時計算,Spark Streaming用於流式資料的處理,但是SparkStreaming是一個準(接近)實時計算的框架。

SparkStreaming在進行實時計算時,採用的是微批次(區別於流式)計算。

使用DStream作為最基本的資料抽象。DStream會將一段時間採集到的資料,封裝為一個RDD進行計算處理。

3. SparkStreaming架構

SparkStreaming程式在架構上整體分為兩塊

​ 資料接受模組: 啟動一個Excutor執行Reciever程式,Reciever程式會將指定時間間隔收到的一批資料,進行儲存,儲存後,將這批資料的id,傳送給Driver。

​ 資料處理模組(Driver): Driver端有RecieverTracer,不斷接受 Reciever傳送的已經收到的一批資料的ID,之後,通過JobGenerator,將這批資料,提交為一個Job,提交Job後,會啟動Excutor運算這批資料。這批資料在運算時,會有Reciever所在的Excutor傳送過來,執行結束後將結果返回給Driver。

4. 背壓機制

Spark Streaming可以動態控制資料接收速率來適配叢集資料處理能力。

背壓機制(即Spark Streaming Backpressure): 根據JobScheduler反饋作業的執行資訊來動態調整Receiver資料接收率。

把spark.streaming.backpressure.enabled 引數設定為ture,開啟背壓機制後Spark Streaming會根據延遲動態去kafka消費資料,上限由spark.streaming.kafka.maxRatePerPartition引數控制,所以兩個引數一般會一起使用。

二. Dstream入門

1. WordCount案例實操

需求:使用netcat工具向9999埠不斷的傳送資料,通過SparkStreaming讀取埠資料並統計不同單詞出現的次數。

① 新增pom依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

② 程式碼實現

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description: WordCount入門案例
 * @author: HaoWu
 * @create: 2020年08月10日
 */
object WordCountTest {
  def main(args: Array[String]): Unit = {

    //1.初始化Spark配置資訊
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化SparkStreamingContext,3秒統計一次,可以設定多個級別:Milliseconds,Seconds,Minutes
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    //3.通過監控埠建立DStream,讀進來的資料為一行行
    val lineStreams = ssc.socketTextStream("hadoop102", 9999)
    //4.處理DStream
    //將每一行資料做切分,形成一個個單詞
    val wordStreams = lineStreams.flatMap(_.split(" "))

    //將單詞對映成元組(word,1)
    val wordAndOneStreams = wordStreams.map((_, 1))

    //將相同的單詞次數做統計
    val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)

    //列印
    wordAndCountStreams.print()
     
    //5.啟動SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
}

③在hadoop102節點啟動nc工具傳送資料,同時啟動SparkStreaming程式

nc -lk hadoop102 9999

結果

-------------------------------------------
Time: 1597053684000 ms
-------------------------------------------
(,1)
(as,1)
(fdaf,1)
(sa,1)

-------------------------------------------
Time: 1597053686000 ms
-------------------------------------------

-------------------------------------------
Time: 1597053688000 ms
-------------------------------------------

2. WordCount解析

Discretized Stream是Spark Streaming的基礎抽象,代表持續性的資料流和經過各種Spark原語操作後的結果資料流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的資料。

3. web UI

注意

SparkStream程式執行要啟動兩個執行緒,最少需要2個CPU,不然程式無法啟動。
Receiver、Driver各啟動一個excupu。本地測試的設定為“local[*]

三. Dstream建立

1. RDD佇列(測試使用)

測試過程中,可以通過使用ssc.queueStream(queueOfRDDs)來建立DStream,每一個推送到這個佇列中的RDD,都會作為一個DStream處理,測試使用驗證資料處理的邏輯

需求:迴圈建立幾個RDD,將RDD放入佇列。通過SparkStream建立Dstream,計算WordCount。

queueStream函式簽名

 def queueStream[T: ClassTag](
      queue: Queue[RDD[T]],   // 傳入的佇列
      oneAtATime: Boolean,  // 在一個週期內,是否只允許採集一個RDD
      defaultRDD: RDD[T]  // 佇列空了時,是否返回一個預設的RDD,可以設定為null,不返回
    ): InputDStream[T] = {
    new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
  }
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
 * @description: RDD佇列建立DStream
 * @author: HaoWu
 * @create: 2020年08月10日
 */
object WordCountSeqTest {
  def main(args: Array[String]): Unit = {
    //1.建立SparkStreamingContext
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDSeqApp")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
    //2.建立可變RDD佇列
    val que: mutable.Queue[RDD[String]] = new mutable.Queue[RDD[String]]()
    //3.建立DStream
    val dStream: InputDStream[String] = ssc.queueStream(que, oneAtATime = false)
    //4.DStream的邏輯處理
    val result: DStream[(String, Int)] = dStream.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
    //5.列印
    result.print(100)
    //6.執行程式
    ssc.start()
    val rdd = ssc.sparkContext.makeRDD(List("sada", "dafa", "adfafa", "fafda"))
    //7.往佇列中每一秒新增一個RDD
    println("Start啟動.....")
    for (i <- 1 to 10) {
      que.+=(rdd)
      Thread.sleep(1000)
    }
    ssc.awaitTermination()
  }
}

結果

Start啟動
-------------------------------------------
Time: 1597055400000 ms
-------------------------------------------
(dafa,1)
(fafda,1)
(adfafa,1)
(sada,1)

-------------------------------------------
Time: 1597055402000 ms
-------------------------------------------
(dafa,2)
(fafda,2)
(adfafa,2)
(sada,2)

2. 自定義資料來源

使用:需要繼承Receiver,並實現onStart、onStop方法來自定義資料來源採集。

繼承Receiver

/*
	StorageLevel: 資料儲存的級別!存記憶體,還是存磁碟等!
    T: 每次收的資料的型別
*/
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable

實現onStart方法

在收資料之前,執行一些指定的安裝操作

def onStart() {
	//1.在收資料時,onStart()不能被阻塞!
	//2.必須新開啟一個執行緒收資料!
	//3.收到資料後,可以呼叫store()來儲存資料!
      }

實現Onstop方法

在停止接收資料之前,清理元件

注意:在發生異常時,可以呼叫restart()重啟接收器,還可以呼叫stop()徹底停止收資料

需求:自定義資料來源,實現監控某個埠號,獲取該埠號內容。

程式碼

import java.io.{BufferedInputStream, BufferedReader, InputStreamReader}
import java.net.Socket
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver


class MyCustomReceiver(var hostname: String, var port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  var socket: Socket = null
  var reader: BufferedReader = null

  /**
   * 重寫onStart方法
   */
  override def onStart(): Unit = {
    //異常處理
    try {
      socket = new Socket(hostname, port)
    } catch {
      case e: ConnectException => {
        restart("重試~~~~");
        return
      }
    }
    println("Socket已經連線上~~~~~")
    //獲取reader
    reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
    //開始接收資料
    recevie()
  }

  /**
   * 新建一個執行緒接收資料
   */
  def recevie(): Unit = {
    new Thread("Socket Receiver ThreadName") {
      //設定當前執行緒為守護執行緒    當前執行緒依附於 Receiver所在的main執行緒!
      // 如果一個JVM中,只有守護執行緒,JVM就會關閉!
      setDaemon(true)
      override def run(): Unit = {
        //異常處理
        try {
          println("開始接收:" + hostname + ":" + port + "  的資料")
          var line = reader.readLine()
          while (socket != null && line != null) {
            //儲存資料
            store(line)
            line = reader.readLine()
          }
        } catch {
          case e: Exception => e.getMessage
        } finally {
          onStop();
          restart("重啟Receiver~~~")
        }

      }
    }.start()
  }

  /**
   * 關閉資源
   */
  override def onStop(): Unit = {
    if (socket != null) {
      socket.close()
      socket = null
    }

    if (reader != null) {
      reader.close()
      reader = null
    }
  }
}

測試

object CostumReceiver extends {
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("CostumReceive")
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(2))
    //建立自定義Receiver
    val receiver: CostumeReceiver = new CostumeReceiver("hadoop102",9999)
    //建立DStream
    val dStream: ReceiverInputDStream[String] = ssc.receiverStream(receiver)
    val result = dStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print(100)
    ssc.start()
    ssc.awaitTermination()
  }
}

3. Kafka直連

好處

由Excutor直接去Kafka讀取資料,減少資料的網路IO傳輸!

Reciver只需要將一個採集週期採集的資料的元資料資訊,傳送給Excutor即可!

案例

pom依賴

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
     <version>3.0.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.10.1</version>
</dependency>

程式碼

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description: SparkStreaming直連消費Kafka資料
 * @author: HaoWu
 * @create: 2020年08月10日
 */
object SparkStreamingKafkaTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("CostumReceive")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
    //設定消費kafka的引數,可以參考kafka.consumer.ConsumerConfig類中配置說明
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", //zookeeper的host,port
      "group.id" -> "g3", //消費者組
      "enable.auto.commit" -> "true", //是否自動提交
      "auto.commit.interval.ms" -> "500", //500ms自動提交offset
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "auto.offset.reset" -> "earliest"//第一次執行,從最初始偏移量開始消費資料
    )

    //使用工具類建立DStream,消費topic test1的資料
    val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      //訂閱主題
      ConsumerStrategies.Subscribe[String, String](List("test1"),
        kafkaParams))

    //邏輯處理
    val result: DStream[(String, Int)] = ds.flatMap(record => record.value().split(" ")).map((_, 1)).reduceByKey(_ + _)
    result.print(100)
    //執行程式
    ssc.start()
    ssc.awaitTermination()
  }
}

測試

啟動zk叢集,kafka叢集,向test1主題新增資料

[root@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test1
>fasdf a
>asf as
>asf sa
實現資料零丟失

spark官網:sparkstreaming整合kafka

方法一:checkpoint實現

①取消基於時間的自動提交,改為手動提交

②在消費邏輯真正執行完後,再手動提交

Spark在手動取消offset提交後,允許設定一個checkpoint目錄,在程式崩潰之前,可以將崩潰時,程式的狀態(包含offset)儲存到目錄中!

在程式重啟後,可以選擇重建狀態!保證從之前未消費的位置繼續消費

缺點:小檔案,重建會啟動很多沒用的任務

程式碼實現

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

/**
 * @description: 保證資料不丟失
 * @author: HaoWu
 * @create: 2020年08月10日
 */
object KafkaTest {
  def main(args: Array[String]): Unit = {
    /**
     *  程式異常重建SparkStreamingContext
     */
    def rebuild(): StreamingContext = {
      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("My app")
      val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
      //設定checkpoint目錄
      ssc.checkpoint("kafka")
      //TODO 消費引數配置
      val kafkaParams: Map[String, Object] = Map[String, Object](
        "bootstrap.servers" -> "hadoop102:9092",
        //      "client.id" -> "c4",
        "group.id" -> "g1",
        "enable.auto.commit" -> "false",
        "auto.commit.interval.ms" -> "500",
        "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
        "auto.offset.reset" -> "earliest"
      )
      //TODO 消費資料穿建 DStream
      val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](List("test1"),
          kafkaParams))
      //TODO 消費邏輯
      val ds1: DStream[String] = ds.flatMap(record => record.value().split(" "))
      //模擬消費異常
      val result: DStream[(String, Int)] = ds1.map(x => {
//        if (x == "d") {
//          throw new UnknownError("程式異常~~~~~~~~~")
//        }
        (x, 1)
      }).reduceByKey(_ + _)
      //列印
      result.print(100)
      ssc
    }
    
    // 重建context  防止程序崩潰,程序崩潰後,重建程式
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("kafka", rebuild)
    //執行程式
    ssc.start()
    ssc.awaitTermination()
  }
}

方法二:手動提交offset

不丟資料,可能資料重複

四. DStream轉化 (API)

無狀態轉化:每個批次單獨處理自己批次中的的RDD。

有狀態轉化:跨批次之間的轉化,當前批次的RDD計算需要和之前的批次的結果做累加。

無狀態轉化

reduceByKey:只針對單個批次的RDD做轉化。

map:RDD的map操作

Transform

將當前批次的RDD[T] => RDD[U]

def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = 
    //轉換為RDD操作
    val ds1: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
    //4.處理DStream
    val ds2: DStream[(String, Int)] = ds1.transform(rdd => {
      val value: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1))
      value
    })
雙流 join

可以實現雙流join,實質就是對2個流各個批次的RDD進行join

前提:兩個流的批次大小一致,DS中的元素必須是K-V結構,拉鍊操作

    //3.通過監控埠建立DStream,讀進來的資料為一行行
    val ds1: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
    val ds2: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop103", 8888)
    //4.處理DStream
    val ds11: DStream[(String, Int)] = ds1.flatMap(_.split(" ")).map((_, 1))
    val ds22: DStream[(String, String)] = ds2.flatMap(_.split(" ")).map((_, "aa"))
    //5.雙流join
    val result: DStream[(String, (Int, String))] = ds11.join(ds22)
    //列印
    result.print(100)

有狀態轉化(重要)

UpdateStateByKey

流計算中累加wordcount可以使用這個運算元

函式簽名

 //Seq[V]:當前批次的相同key的values集合
 //Option[S]:之前批次的結果,可以通過
 def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)] = 

案例:求截止到當前時間單詞的個數(wordcount)

/**
 * @description: **UpdateStateByKey**案例
 * @author: HaoWu
 * @create: 2020年08月10日
 */
object NoStatusTest {
  def main(args: Array[String]): Unit = {
    //1.初始化Spark配置資訊
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
    //2.初始化SparkStreamingContext,3秒統計一次,可以設定多個級別:Milliseconds,Seconds,Minutes
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //設定checkpoint,儲存狀態
    ssc.checkpoint("./updatestate")
    //通過監控埠建立DStream,讀進來的資料為一行
    val ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
    //轉化為K-V型別
    val ds1: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_, 1))
    val result: DStream[(String, Int)] = ds1.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {
      var sum: Int = seq.sum
      val value: Int = option.getOrElse(0)
      sum += value
      Some(sum)
    })
    //列印
    result.print(100)
    //5.啟動SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
}

結果

-------------------------------------------
Time: 1597142208000 ms
-------------------------------------------
(a,7)
(b,3)

-------------------------------------------
Time: 1597142211000 ms
-------------------------------------------
(a,9)
(ab,1)
(b,4)

-------------------------------------------
Time: 1597142214000 ms
-------------------------------------------
(a,10)
(ab,2)
(b,5)

注意

①RDD是K-V

②updateFunc引數裡面引數宣告泛型[],返回結果用Some包裝

③設定checkpoint

WindowOperations 視窗

Window Operations可以設定視窗的大小和滑動視窗的間隔來動態的獲取當前Steaming的允許狀態。所有基於視窗的操作都需要兩個引數,分別為視窗時長以及滑動步長。

視窗時長:計算內容的時間範圍。

滑動步長:隔多久觸發一次計算。

注意:這兩者都必須為採集週期大小的整數倍

兩種實現

①每個視窗單獨統計視窗內部資料,每次滑動,重新計算(無狀態

  def reduceByWindow(
      //視窗內的歸約計算
      reduceFunc: (T, T) => T, 
      //視窗大小
      windowDuration: Duration,
      //步長
      slideDuration: Duration
    ): DStream[T] = ssc.withScope {
    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
  }

②當前視窗和之前視窗有重疊,會使用之前的視窗的資料和當前視窗計算(有狀態

def reduceByKeyAndWindow(
    // old window 和新進入的values進行運算(上圖的視窗B綠色部分)
      reduceFunc: (V, V) => V,
    // old window和離開的values進行運算(上圖的視窗A的黃色部分)
      invReduceFunc: (V, V) => V,
    //視窗大小
      windowDuration: Duration,
    //步長
      slideDuration: Duration = self.slideDuration,
      numPartitions: Int = ssc.sc.defaultParallelism,
      filterFunc: ((K, V)) => Boolean = null
    ): DStream[(K, V)] = ssc.withScope {
    reduceByKeyAndWindow(
      reduceFunc, invReduceFunc, windowDuration,
      slideDuration, defaultPartitioner(numPartitions), filterFunc
    )
  }

案例:每間隔5分鐘,統計最近1h所有的單詞統計

實現一:無狀態

    //1.初始化Spark配置資訊
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
    //2.初始化SparkStreamingContext,3秒統計一次,可以設定多個級別:Milliseconds,Seconds,Minutes
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //通過監控埠建立DStream,讀進來的資料為一行
    val ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
    //轉化為K-V型別
    val ds1: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_, 1))
    val result: DStream[(String, Int)] = ds1.reduceByKeyAndWindow((_ + _), windowDuration = Seconds(4), Seconds(2))
    //列印
    result.print(100)
    //5.啟動SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()

實現二:有狀態

需要設定檢查點

        //1.初始化Spark配置資訊
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
    //2.初始化SparkStreamingContext,3秒統計一次,可以設定多個級別:Milliseconds,Seconds,Minutes
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //需要上一個window計算的結果,設定檢查點
    ssc.checkpoint("updateStateByKey1")
    // DS[String] :  輸入流中的每行資料
    val ds: ReceiverInputDStream[String] = context.socketTextStream("hadoop103", 3333)
    val result: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_, 1))
      .reduceByKeyAndWindow((_+_),(_ - _),windowDuration=Seconds(4),filterFunc=_._2 != 0)
    result.print(100)
    //執行程式
    context.start()
    context.awaitTermination()
window視窗

定義DS的視窗,之後DS的運算元都是在視窗中運算

  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
    new WindowedDStream(this, windowDuration, slideDuration)
  }
ds.window(視窗大小,滑動步長)

五. 程式優雅關閉

流式任務需要7*24小時執行,但是有時涉及到升級程式碼需要主動停止程式,但是分散式程式,沒辦法做到一個個程序去殺死,所有配置優雅的關閉就顯得至關重要了。使用外部檔案系統來控制內部程式關閉

MonitorStop類:啟動一個執行緒檢查是否停止程式

import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}

class MonitorStop(ssc: StreamingContext) extends Runnable {

  override def run(): Unit = {
    val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "root")

    while (true) {
      try
        Thread.sleep(5000)
      catch {
        case e: InterruptedException =>
          e.printStackTrace()
      }
      val state: StreamingContextState = ssc.getState
       // 讀取一個標記(資料庫,檔案系統)/應用程式/_stop
      val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))
      if (bool) {
        if (state == StreamingContextState.ACTIVE) {
          ssc.stop(stopSparkContext = true, stopGracefully = true)
          System.exit(0)
        }
      }
    }
  }
}

SparkTest

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkTest {
  def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {

    val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => {

      //當前批次內容的計算
      val sum: Int = values.sum

      //取出狀態資訊中上一次狀態
      val lastStatu: Int = status.getOrElse(0)

      Some(sum + lastStatu)
    }

    val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest")

    //設定優雅的關閉
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    ssc.checkpoint("./ck")

    val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)

    val word: DStream[String] = line.flatMap(_.split(" "))

    val wordAndOne: DStream[(String, Int)] = word.map((_, 1))

    val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)

    wordAndCount.print()

    ssc
  }

  def main(args: Array[String]): Unit = {

    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())

    new Thread(new MonitorStop(ssc)).start()
    ssc.start()
    ssc.awaitTermination()
  }
}

練手示例

  /*
      優雅地關閉
   */
  @Test
  def test5() : Unit ={

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("My app")

    val context = new StreamingContext(conf, Seconds(2))

    // DS[String] :  輸入流中的每行資料
    val ds: ReceiverInputDStream[String] = context.socketTextStream("hadoop103", 3333)

    val result: DStream[(String, Int)] = ds.window(Seconds(4),Seconds(2))
      .flatMap(_.split(" ")).map((_, 1))
      .reduceByKey(_+_)
    result.foreachRDD(rdd => println(rdd.collect().mkString(",")))
    //執行程式
    context.start()
	
    //啟動分執行緒,執行關閉
    new Thread(){

      //判斷是否需要關閉
      def ifShouldNotStop():Boolean={
          // 讀取一個標記(資料庫,檔案系統)/應用程式/_stop
          true
      }
      //關閉
      override def run(): Unit = {

        while(ifShouldNotStop()){
            Thread.sleep(5000)
        }

        // 關閉   stopGraceFully: 等收到的資料計算完成後再關閉
        context.stop(true,true)

      }
    }.start()

     // 當前執行緒阻塞,後續的程式碼都不會執行!
    context.awaitTermination()
  }
}