【轉載】【必會】SparkStreaming的視窗操作及實戰
阿新 • • 發佈:2021-07-21
下面,通過一張圖來描述SparkStreaming的視窗操作,如圖所示。
基於視窗的操作需要兩個引數,如下:
-
視窗長度(windowDuration),控制每次計算最近的多少個批次的資料;
-
滑動間隔(slideDuration),用來控制對新的 DStream 進行計算的間隔。
兩者都必須是 StreamingContext 中批次間隔(batchDuration)的整數倍。
使用視窗操作,即使用視窗操作進行實戰。
每秒傳送1個數字
package cn.lagou.streaming import java.io.PrintWriter import java.net.{ServerSocket, Socket} object SocketLikeNCWithWindow { def main(args: Array[String]): Unit = { val port = 1521 val ss = new ServerSocket(port) val socket: Socket = ss.accept() println("connect to host : " + socket.getInetAddress) var i = 0 // 每秒傳送1個數 while(true) { i += 1 val out = new PrintWriter(socket.getOutputStream) out.println(i) out.flush() Thread.sleep(1000) } } }
案例一
觀察視窗的資料;觀察 batchDuration、windowDuration、slideDuration 三者之間的關係;使用視窗相關的操作,具體程式碼演示如下:
package cn.lagou.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object WindowDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]") .setAppName(this.getClass.getCanonicalName) // 每 5s 生成一個RDD(mini-batch) val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("error") val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 1521) lines.foreachRDD{ (rdd, time) =>println(s"rdd = ${rdd.id}; time = $time") rdd.foreach(value => println(value)) } val res1: DStream[String] =lines.reduceByWindow(_ + " " + _,Seconds(20), Seconds(10)) res1.print() val res2: DStream[String] = lines.window(Seconds(20),Seconds(10)) res2.print() // 求視窗元素的和 val res3:DStream[Int]=lines.map(_.toInt).reduceByWindow(_+_,Seconds(20), Seconds(10)) res3.print() // 求視窗元素的和 val res4 = res2.map(_.toInt).reduce(_+_) res4.print() ssc.start() ssc.awaitTermination() } }
案例二
熱點搜尋詞實時統計。每隔 10 秒,統計最近20秒的詞出現的次數,具體程式碼演示如下:
package cn.lagou.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HotWordStats {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
//設定檢查點,檢查點具有容錯機制。生產環境中應設定到HDFS
ssc.checkpoint("data/checkpoint/")
val lines: ReceiverInputDStream[String] =ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
// 通過reduceByKeyAndWindow運算元, 每隔10秒統計最近20秒的詞出現的次數
// 後 3個引數:視窗時間長度、滑動視窗時間、分割槽
val wordCounts1: DStream[(String, Int)] =pairs.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,Seconds(20),Seconds(10), 2)
wordCounts1.print
// 這裡需要checkpoint的支援
val wordCounts2: DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_ + _,_ - _,
Seconds(20),Seconds(10), 2)
wordCounts2.print
ssc.start()
ssc.awaitTermination()
}
}