Spark(十六)【SparkStreaming基本使用】
目錄
一. SparkStreaming簡介
1. 相關術語
流式資料: 指資料來源源不斷。
實時資料: 當前正在產生的資料。
離線資料: 過去(不是當下產生的)已經產生的資料。
實時計算: 理想上,實時計算一定是對實時資料的計算,理想期望立刻當前計算出結果(要在公司規定的時效範圍內)。
離線計算: 計算通常需要劃分一段時間。
總結:離線計算和實時計算主要通過計算的時效性進行區分,實時在不同的公司,有相對參考的標準。
2. SparkStreaming概念
SparkStreaming可以用來進行實時計算,Spark Streaming用於流式資料的處理,但是SparkStreaming是一個準(接近)實時計算的框架。
SparkStreaming在進行實時計算時,採用的是微批次(區別於流式)計算。
使用DStream作為最基本的資料抽象。DStream會將一段時間採集到的資料,封裝為一個RDD進行計算處理。
3. SparkStreaming架構
SparkStreaming程式在架構上整體分為兩塊
資料接受模組: 啟動一個Excutor執行Reciever程式,Reciever程式會將指定時間間隔收到的一批資料,進行儲存,儲存後,將這批資料的id,傳送給Driver。
資料處理模組(Driver): Driver端有RecieverTracer,不斷接受 Reciever傳送的已經收到的一批資料的ID,之後,通過JobGenerator,將這批資料,提交為一個Job,提交Job後,會啟動Excutor運算這批資料。這批資料在運算時,會有Reciever所在的Excutor傳送過來,執行結束後將結果返回給Driver。
4. 背壓機制
Spark Streaming可以動態控制資料接收速率來適配叢集資料處理能力。
背壓機制(即Spark Streaming Backpressure): 根據JobScheduler反饋作業的執行資訊來動態調整Receiver資料接收率。
把spark.streaming.backpressure.enabled 引數設定為ture,開啟背壓機制後Spark Streaming會根據延遲動態去kafka消費資料,上限由spark.streaming.kafka.maxRatePerPartition引數控制,所以兩個引數一般會一起使用。
二. Dstream入門
1. WordCount案例實操
需求:使用netcat工具向9999埠不斷的傳送資料,通過SparkStreaming讀取埠資料並統計不同單詞出現的次數。
① 新增pom依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
② 程式碼實現
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description: WordCount入門案例
* @author: HaoWu
* @create: 2020年08月10日
*/
object WordCountTest {
def main(args: Array[String]): Unit = {
//1.初始化Spark配置資訊
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
//2.初始化SparkStreamingContext,3秒統計一次,可以設定多個級別:Milliseconds,Seconds,Minutes
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.通過監控埠建立DStream,讀進來的資料為一行行
val lineStreams = ssc.socketTextStream("hadoop102", 9999)
//4.處理DStream
//將每一行資料做切分,形成一個個單詞
val wordStreams = lineStreams.flatMap(_.split(" "))
//將單詞對映成元組(word,1)
val wordAndOneStreams = wordStreams.map((_, 1))
//將相同的單詞次數做統計
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)
//列印
wordAndCountStreams.print()
//5.啟動SparkStreamingContext
ssc.start()
ssc.awaitTermination()
}
}
③在hadoop102節點啟動nc工具傳送資料,同時啟動SparkStreaming程式
nc -lk hadoop102 9999
結果
-------------------------------------------
Time: 1597053684000 ms
-------------------------------------------
(,1)
(as,1)
(fdaf,1)
(sa,1)
-------------------------------------------
Time: 1597053686000 ms
-------------------------------------------
-------------------------------------------
Time: 1597053688000 ms
-------------------------------------------
2. WordCount解析
Discretized Stream是Spark Streaming的基礎抽象,代表持續性的資料流和經過各種Spark原語操作後的結果資料流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的資料。
3. web UI
注意
SparkStream程式執行要啟動兩個執行緒,最少需要2個CPU,不然程式無法啟動。
Receiver、Driver各啟動一個excupu。本地測試的設定為“local[*]”
三. Dstream建立
1. RDD佇列(測試使用)
測試過程中,可以通過使用ssc.queueStream(queueOfRDDs)來建立DStream,每一個推送到這個佇列中的RDD,都會作為一個DStream處理,測試使用驗證資料處理的邏輯
需求:迴圈建立幾個RDD,將RDD放入佇列。通過SparkStream建立Dstream,計算WordCount。
queueStream函式簽名
def queueStream[T: ClassTag](
queue: Queue[RDD[T]], // 傳入的佇列
oneAtATime: Boolean, // 在一個週期內,是否只允許採集一個RDD
defaultRDD: RDD[T] // 佇列空了時,是否返回一個預設的RDD,可以設定為null,不返回
): InputDStream[T] = {
new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* @description: RDD佇列建立DStream
* @author: HaoWu
* @create: 2020年08月10日
*/
object WordCountSeqTest {
def main(args: Array[String]): Unit = {
//1.建立SparkStreamingContext
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDSeqApp")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
//2.建立可變RDD佇列
val que: mutable.Queue[RDD[String]] = new mutable.Queue[RDD[String]]()
//3.建立DStream
val dStream: InputDStream[String] = ssc.queueStream(que, oneAtATime = false)
//4.DStream的邏輯處理
val result: DStream[(String, Int)] = dStream.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
//5.列印
result.print(100)
//6.執行程式
ssc.start()
val rdd = ssc.sparkContext.makeRDD(List("sada", "dafa", "adfafa", "fafda"))
//7.往佇列中每一秒新增一個RDD
println("Start啟動.....")
for (i <- 1 to 10) {
que.+=(rdd)
Thread.sleep(1000)
}
ssc.awaitTermination()
}
}
結果
Start啟動
-------------------------------------------
Time: 1597055400000 ms
-------------------------------------------
(dafa,1)
(fafda,1)
(adfafa,1)
(sada,1)
-------------------------------------------
Time: 1597055402000 ms
-------------------------------------------
(dafa,2)
(fafda,2)
(adfafa,2)
(sada,2)
2. 自定義資料來源
使用:需要繼承Receiver,並實現onStart、onStop方法來自定義資料來源採集。
繼承Receiver
/*
StorageLevel: 資料儲存的級別!存記憶體,還是存磁碟等!
T: 每次收的資料的型別
*/
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
實現onStart方法
在收資料之前,執行一些指定的安裝操作
def onStart() {
//1.在收資料時,onStart()不能被阻塞!
//2.必須新開啟一個執行緒收資料!
//3.收到資料後,可以呼叫store()來儲存資料!
}
實現Onstop方法
在停止接收資料之前,清理元件
注意:在發生異常時,可以呼叫restart()重啟接收器,還可以呼叫stop()徹底停止收資料
需求:自定義資料來源,實現監控某個埠號,獲取該埠號內容。
程式碼
import java.io.{BufferedInputStream, BufferedReader, InputStreamReader}
import java.net.Socket
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class MyCustomReceiver(var hostname: String, var port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
var socket: Socket = null
var reader: BufferedReader = null
/**
* 重寫onStart方法
*/
override def onStart(): Unit = {
//異常處理
try {
socket = new Socket(hostname, port)
} catch {
case e: ConnectException => {
restart("重試~~~~");
return
}
}
println("Socket已經連線上~~~~~")
//獲取reader
reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
//開始接收資料
recevie()
}
/**
* 新建一個執行緒接收資料
*/
def recevie(): Unit = {
new Thread("Socket Receiver ThreadName") {
//設定當前執行緒為守護執行緒 當前執行緒依附於 Receiver所在的main執行緒!
// 如果一個JVM中,只有守護執行緒,JVM就會關閉!
setDaemon(true)
override def run(): Unit = {
//異常處理
try {
println("開始接收:" + hostname + ":" + port + " 的資料")
var line = reader.readLine()
while (socket != null && line != null) {
//儲存資料
store(line)
line = reader.readLine()
}
} catch {
case e: Exception => e.getMessage
} finally {
onStop();
restart("重啟Receiver~~~")
}
}
}.start()
}
/**
* 關閉資源
*/
override def onStop(): Unit = {
if (socket != null) {
socket.close()
socket = null
}
if (reader != null) {
reader.close()
reader = null
}
}
}
測試
object CostumReceiver extends {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("CostumReceive")
val ssc: StreamingContext = new StreamingContext(conf,Seconds(2))
//建立自定義Receiver
val receiver: CostumeReceiver = new CostumeReceiver("hadoop102",9999)
//建立DStream
val dStream: ReceiverInputDStream[String] = ssc.receiverStream(receiver)
val result = dStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print(100)
ssc.start()
ssc.awaitTermination()
}
}
3. Kafka直連
好處
由Excutor直接去Kafka讀取資料,減少資料的網路IO傳輸!
Reciver只需要將一個採集週期採集的資料的元資料資訊,傳送給Excutor即可!
案例
pom依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
程式碼
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description: SparkStreaming直連消費Kafka資料
* @author: HaoWu
* @create: 2020年08月10日
*/
object SparkStreamingKafkaTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("CostumReceive")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
//設定消費kafka的引數,可以參考kafka.consumer.ConsumerConfig類中配置說明
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", //zookeeper的host,port
"group.id" -> "g3", //消費者組
"enable.auto.commit" -> "true", //是否自動提交
"auto.commit.interval.ms" -> "500", //500ms自動提交offset
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "earliest"//第一次執行,從最初始偏移量開始消費資料
)
//使用工具類建立DStream,消費topic test1的資料
val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
//訂閱主題
ConsumerStrategies.Subscribe[String, String](List("test1"),
kafkaParams))
//邏輯處理
val result: DStream[(String, Int)] = ds.flatMap(record => record.value().split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print(100)
//執行程式
ssc.start()
ssc.awaitTermination()
}
}
測試
啟動zk叢集,kafka叢集,向test1主題新增資料
[root@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test1
>fasdf a
>asf as
>asf sa
實現資料零丟失
spark官網:sparkstreaming整合kafka
方法一:checkpoint實現
①取消基於時間的自動提交,改為手動提交
②在消費邏輯真正執行完後,再手動提交
Spark在手動取消offset提交後,允許設定一個checkpoint目錄,在程式崩潰之前,可以將崩潰時,程式的狀態(包含offset)儲存到目錄中!
在程式重啟後,可以選擇重建狀態!保證從之前未消費的位置繼續消費
缺點:小檔案,重建會啟動很多沒用的任務
程式碼實現
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
/**
* @description: 保證資料不丟失
* @author: HaoWu
* @create: 2020年08月10日
*/
object KafkaTest {
def main(args: Array[String]): Unit = {
/**
* 程式異常重建SparkStreamingContext
*/
def rebuild(): StreamingContext = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("My app")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
//設定checkpoint目錄
ssc.checkpoint("kafka")
//TODO 消費引數配置
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "hadoop102:9092",
// "client.id" -> "c4",
"group.id" -> "g1",
"enable.auto.commit" -> "false",
"auto.commit.interval.ms" -> "500",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "earliest"
)
//TODO 消費資料穿建 DStream
val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](List("test1"),
kafkaParams))
//TODO 消費邏輯
val ds1: DStream[String] = ds.flatMap(record => record.value().split(" "))
//模擬消費異常
val result: DStream[(String, Int)] = ds1.map(x => {
// if (x == "d") {
// throw new UnknownError("程式異常~~~~~~~~~")
// }
(x, 1)
}).reduceByKey(_ + _)
//列印
result.print(100)
ssc
}
// 重建context 防止程序崩潰,程序崩潰後,重建程式
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("kafka", rebuild)
//執行程式
ssc.start()
ssc.awaitTermination()
}
}
方法二:手動提交offset
不丟資料,可能資料重複
四. DStream轉化 (API)
無狀態轉化:每個批次單獨處理自己批次中的的RDD。
有狀態轉化:跨批次之間的轉化,當前批次的RDD計算需要和之前的批次的結果做累加。
無狀態轉化
reduceByKey:只針對單個批次的RDD做轉化。
map:RDD的map操作
Transform
將當前批次的RDD[T] => RDD[U]
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] =
//轉換為RDD操作
val ds1: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
//4.處理DStream
val ds2: DStream[(String, Int)] = ds1.transform(rdd => {
val value: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1))
value
})
雙流 join
可以實現雙流join,實質就是對2個流各個批次的RDD進行join
前提:兩個流的批次大小一致,DS中的元素必須是K-V結構,拉鍊操作
//3.通過監控埠建立DStream,讀進來的資料為一行行
val ds1: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
val ds2: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop103", 8888)
//4.處理DStream
val ds11: DStream[(String, Int)] = ds1.flatMap(_.split(" ")).map((_, 1))
val ds22: DStream[(String, String)] = ds2.flatMap(_.split(" ")).map((_, "aa"))
//5.雙流join
val result: DStream[(String, (Int, String))] = ds11.join(ds22)
//列印
result.print(100)
有狀態轉化(重要)
UpdateStateByKey
流計算中累加wordcount可以使用這個運算元
函式簽名
//Seq[V]:當前批次的相同key的values集合
//Option[S]:之前批次的結果,可以通過
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] =
案例:求截止到當前時間單詞的個數(wordcount)
/**
* @description: **UpdateStateByKey**案例
* @author: HaoWu
* @create: 2020年08月10日
*/
object NoStatusTest {
def main(args: Array[String]): Unit = {
//1.初始化Spark配置資訊
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
//2.初始化SparkStreamingContext,3秒統計一次,可以設定多個級別:Milliseconds,Seconds,Minutes
val ssc = new StreamingContext(sparkConf, Seconds(3))
//設定checkpoint,儲存狀態
ssc.checkpoint("./updatestate")
//通過監控埠建立DStream,讀進來的資料為一行
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
//轉化為K-V型別
val ds1: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_, 1))
val result: DStream[(String, Int)] = ds1.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {
var sum: Int = seq.sum
val value: Int = option.getOrElse(0)
sum += value
Some(sum)
})
//列印
result.print(100)
//5.啟動SparkStreamingContext
ssc.start()
ssc.awaitTermination()
}
}
結果
-------------------------------------------
Time: 1597142208000 ms
-------------------------------------------
(a,7)
(b,3)
-------------------------------------------
Time: 1597142211000 ms
-------------------------------------------
(a,9)
(ab,1)
(b,4)
-------------------------------------------
Time: 1597142214000 ms
-------------------------------------------
(a,10)
(ab,2)
(b,5)
注意:
①RDD是K-V
②updateFunc引數裡面引數宣告泛型[],返回結果用Some包裝
③設定checkpoint
WindowOperations 視窗
Window Operations可以設定視窗的大小和滑動視窗的間隔來動態的獲取當前Steaming的允許狀態。所有基於視窗的操作都需要兩個引數,分別為視窗時長以及滑動步長。
視窗時長:計算內容的時間範圍。
滑動步長:隔多久觸發一次計算。
注意:這兩者都必須為採集週期大小的整數倍。
兩種實現
①每個視窗單獨統計視窗內部資料,每次滑動,重新計算(無狀態)
def reduceByWindow(
//視窗內的歸約計算
reduceFunc: (T, T) => T,
//視窗大小
windowDuration: Duration,
//步長
slideDuration: Duration
): DStream[T] = ssc.withScope {
this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
②當前視窗和之前視窗有重疊,會使用之前的視窗的資料和當前視窗計算(有狀態)
def reduceByKeyAndWindow(
// old window 和新進入的values進行運算(上圖的視窗B綠色部分)
reduceFunc: (V, V) => V,
// old window和離開的values進行運算(上圖的視窗A的黃色部分)
invReduceFunc: (V, V) => V,
//視窗大小
windowDuration: Duration,
//步長
slideDuration: Duration = self.slideDuration,
numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowDuration,
slideDuration, defaultPartitioner(numPartitions), filterFunc
)
}
案例:每間隔5分鐘,統計最近1h所有的單詞統計
實現一:無狀態
//1.初始化Spark配置資訊
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
//2.初始化SparkStreamingContext,3秒統計一次,可以設定多個級別:Milliseconds,Seconds,Minutes
val ssc = new StreamingContext(sparkConf, Seconds(3))
//通過監控埠建立DStream,讀進來的資料為一行
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
//轉化為K-V型別
val ds1: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_, 1))
val result: DStream[(String, Int)] = ds1.reduceByKeyAndWindow((_ + _), windowDuration = Seconds(4), Seconds(2))
//列印
result.print(100)
//5.啟動SparkStreamingContext
ssc.start()
ssc.awaitTermination()
實現二:有狀態
需要設定檢查點
//1.初始化Spark配置資訊
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
//2.初始化SparkStreamingContext,3秒統計一次,可以設定多個級別:Milliseconds,Seconds,Minutes
val ssc = new StreamingContext(sparkConf, Seconds(3))
//需要上一個window計算的結果,設定檢查點
ssc.checkpoint("updateStateByKey1")
// DS[String] : 輸入流中的每行資料
val ds: ReceiverInputDStream[String] = context.socketTextStream("hadoop103", 3333)
val result: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_, 1))
.reduceByKeyAndWindow((_+_),(_ - _),windowDuration=Seconds(4),filterFunc=_._2 != 0)
result.print(100)
//執行程式
context.start()
context.awaitTermination()
window視窗
定義DS的視窗,之後DS的運算元都是在視窗中運算
def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
new WindowedDStream(this, windowDuration, slideDuration)
}
ds.window(視窗大小,滑動步長)
五. 程式優雅關閉
流式任務需要7*24小時執行,但是有時涉及到升級程式碼需要主動停止程式,但是分散式程式,沒辦法做到一個個程序去殺死,所有配置優雅的關閉就顯得至關重要了。使用外部檔案系統來控制內部程式關閉
MonitorStop類:啟動一個執行緒檢查是否停止程式
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
class MonitorStop(ssc: StreamingContext) extends Runnable {
override def run(): Unit = {
val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "root")
while (true) {
try
Thread.sleep(5000)
catch {
case e: InterruptedException =>
e.printStackTrace()
}
val state: StreamingContextState = ssc.getState
// 讀取一個標記(資料庫,檔案系統)/應用程式/_stop
val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))
if (bool) {
if (state == StreamingContextState.ACTIVE) {
ssc.stop(stopSparkContext = true, stopGracefully = true)
System.exit(0)
}
}
}
}
}
SparkTest
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTest {
def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => {
//當前批次內容的計算
val sum: Int = values.sum
//取出狀態資訊中上一次狀態
val lastStatu: Int = status.getOrElse(0)
Some(sum + lastStatu)
}
val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest")
//設定優雅的關閉
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("./ck")
val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
val word: DStream[String] = line.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
wordAndCount.print()
ssc
}
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())
new Thread(new MonitorStop(ssc)).start()
ssc.start()
ssc.awaitTermination()
}
}
練手示例
/*
優雅地關閉
*/
@Test
def test5() : Unit ={
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("My app")
val context = new StreamingContext(conf, Seconds(2))
// DS[String] : 輸入流中的每行資料
val ds: ReceiverInputDStream[String] = context.socketTextStream("hadoop103", 3333)
val result: DStream[(String, Int)] = ds.window(Seconds(4),Seconds(2))
.flatMap(_.split(" ")).map((_, 1))
.reduceByKey(_+_)
result.foreachRDD(rdd => println(rdd.collect().mkString(",")))
//執行程式
context.start()
//啟動分執行緒,執行關閉
new Thread(){
//判斷是否需要關閉
def ifShouldNotStop():Boolean={
// 讀取一個標記(資料庫,檔案系統)/應用程式/_stop
true
}
//關閉
override def run(): Unit = {
while(ifShouldNotStop()){
Thread.sleep(5000)
}
// 關閉 stopGraceFully: 等收到的資料計算完成後再關閉
context.stop(true,true)
}
}.start()
// 當前執行緒阻塞,後續的程式碼都不會執行!
context.awaitTermination()
}
}