1. 程式人生 > 其它 >【轉載】【必會】SparkStreaming的視窗操作及實戰

【轉載】【必會】SparkStreaming的視窗操作及實戰

Window Operations(視窗操作)可以設定視窗大小和滑動視窗間隔來動態的獲取當前Streaming的狀態。基於視窗的操作會在一個比 StreamingContext 的 batchDuration(批次間隔)更長的時間範圍內,通過整合多個批次的結果,計算出整個視窗的結果。

下面,通過一張圖來描述SparkStreaming的視窗操作,如圖所示。

基於視窗的操作需要兩個引數,如下:

  • 視窗長度(windowDuration),控制每次計算最近的多少個批次的資料;

  • 滑動間隔(slideDuration),用來控制對新的 DStream 進行計算的間隔。

兩者都必須是 StreamingContext 中批次間隔(batchDuration)的整數倍。

使用視窗操作,即使用視窗操作進行實戰。

每秒傳送1個數字

package cn.lagou.streaming
import java.io.PrintWriter
import java.net.{ServerSocket, Socket}
object SocketLikeNCWithWindow {
    def main(args: Array[String]): Unit = {
        val port = 1521
        val ss = new ServerSocket(port)
        val socket: Socket = ss.accept()
        println("connect to host : " + socket.getInetAddress)
        var i = 0
        // 每秒傳送1個數
        while(true) {
            i += 1
            val out = new PrintWriter(socket.getOutputStream)
            out.println(i)
            out.flush()
            Thread.sleep(1000)
        }
    }
}

案例一

觀察視窗的資料;觀察 batchDuration、windowDuration、slideDuration 三者之間的關係;使用視窗相關的操作,具體程式碼演示如下:

package cn.lagou.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]")
                                  .setAppName(this.getClass.getCanonicalName)
        // 每 5s 生成一個RDD(mini-batch)
        val ssc = new StreamingContext(conf, Seconds(5))
        ssc.sparkContext.setLogLevel("error")
        val lines: ReceiverInputDStream[String] =
        ssc.socketTextStream("localhost", 1521)
        lines.foreachRDD{ (rdd, time) =>println(s"rdd = ${rdd.id}; time = $time")
            rdd.foreach(value => println(value))
        }
        val res1: DStream[String] =lines.reduceByWindow(_ + " " + _,Seconds(20), Seconds(10))
        res1.print()
        val res2: DStream[String] = lines.window(Seconds(20),Seconds(10))
        res2.print()
        // 求視窗元素的和
        val res3:DStream[Int]=lines.map(_.toInt).reduceByWindow(_+_,Seconds(20), Seconds(10))
        res3.print()
        // 求視窗元素的和
        val res4 = res2.map(_.toInt).reduce(_+_)
        res4.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

案例二

熱點搜尋詞實時統計。每隔 10 秒,統計最近20秒的詞出現的次數,具體程式碼演示如下:

package cn.lagou.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HotWordStats {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[2]")
                              .setAppName(this.getClass.getCanonicalName)
        val ssc = new StreamingContext(conf, Seconds(2))
        ssc.sparkContext.setLogLevel("ERROR")
        //設定檢查點,檢查點具有容錯機制。生產環境中應設定到HDFS
        ssc.checkpoint("data/checkpoint/")
        val lines: ReceiverInputDStream[String] =ssc.socketTextStream("localhost", 9999)
        val words: DStream[String] = lines.flatMap(_.split("\\s+"))
        val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
        // 通過reduceByKeyAndWindow運算元, 每隔10秒統計最近20秒的詞出現的次數
        // 後 3個引數:視窗時間長度、滑動視窗時間、分割槽
        val wordCounts1: DStream[(String, Int)] =pairs.reduceByKeyAndWindow(
                                    (a: Int, b: Int) => a + b,Seconds(20),Seconds(10), 2)
        wordCounts1.print
        // 這裡需要checkpoint的支援
        val wordCounts2: DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_ + _,_ - _,
                                                  Seconds(20),Seconds(10), 2)
        wordCounts2.print
        ssc.start()
        ssc.awaitTermination()
    }
}