1. 程式人生 > 其它 >spark streaming 之window視窗操作

spark streaming 之window視窗操作

技術標籤:windowspark大資料

視窗函式,就是在DStream流上,以一個可配置的長度為視窗,以一個可配置的速率向前移動視窗,根據視窗函式的具體內容,分別對當前視窗中的這一波資料採取某個對應的操作運算元。

需要注意的是視窗長度,和視窗移動速率需要是batch time的整數倍。
在這裡插入圖片描述

1.window(windowLength, slideInterval)

該操作由一個DStream物件呼叫,傳入一個視窗長度引數,一個視窗移動速率引數,然後將當前時刻當前長度視窗中的元素取出形成一個新的DStream。

object SparkWindowDemo {
  def main(args: Array[
String]): Unit = { val sparkConf = new SparkConf().setAppName("kafaksource").setMaster("local[2]") val streamingContext = new StreamingContext(sparkConf,Seconds(2)) //採集週期2秒 streamingContext.checkpoint("checkpoint") val kafkaParams = Map( (ConsumerConfig.
BOOTSTRAP_SERVERS_CONFIG -> "192.168.195.20:9092"), (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"), (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
), (ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup2") ) val kafkaStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams) ) val numStream: DStream[(String, Int)] = kafkaStream.flatMap(line => line.value().toString.split("\\s+")) .map((_, 1)).window(Seconds(8),Seconds(4)) //視窗長度8秒,滑動視窗長度為4秒,且於採集週期為倍數關係 numStream.print() streamingContext.start() streamingContext.awaitTermination() } }

開啟kafka,生產訊息,進行測試

kafka-console-producer.sh --topic sparkkafkademo --broker-list 192.168.195.20:9092

2. countByWindow(windowLength,slideInterval)

返回指定長度視窗中的元素個數。

注:需要設定checkpoint
在這裡插入圖片描述

object SparkWindowDemo2 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("kafaksource").setMaster("local[2]")
    val streamingContext = new StreamingContext(sparkConf,Seconds(2)) //採集週期2秒
    streamingContext.checkpoint("checkpoint")

    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.195.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup2")
    )

    val kafkaStream:InputDStream[ConsumerRecord[String,String]]
    = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)
    )

    val numStream: DStream[Long] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
      .map((_, 1))
      .countByWindow(Seconds(8), Seconds(4))
       //視窗長度8秒,滑動視窗長度為4秒,且於採集週期為倍數關係
    numStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

需要用kafka生產訊息進行測試

3. countByValueAndWindow(windowLength,slideInterval, [numTasks])

統計當前時間視窗中元素值相同的元素的個數

注:需要設定checkpoint

省略相同的程式碼(同上)
    val numStream: DStream[(String, Long)]
      = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
      .countByValueAndWindow(Seconds(8), Seconds(4))

4. reduceByWindow(func, windowLength,slideInterval)

在呼叫DStream上首先取視窗函式的元素形成新的DStream,然後在視窗元素形成的DStream上進行reduce。

 val numStream: DStream[String] = kafkaStream
      .flatMap(line => line.value().toString.split("\\s+"))
      .reduceByWindow(_ + ":" + _, Seconds(8), Seconds(2))

5.reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

reduceByKeyAndWindow的資料來源是基於該DStream的視窗長度中的所有資料進行計算。該操作有一個可選的併發數引數。

val numStream: DStream[(String, Int)] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
      .map((_, 1))
      .reduceByKeyAndWindow((x: Int, y: Int) => {
        x + y
      }, (x: Int, y: Int) => {
        x - y
      }, Seconds(8), Seconds(2))

6.transform

 val numStream: DStream[((String, String), Int)] = kafkaStream.transform((rdd, timestamp) => {
      val format = new SimpleDateFormat("yyyyMMdd HH:mm:ss")
      val time: String = format.format(timestamp.milliseconds)
      val value: RDD[((String, String), Int)] = rdd
        .flatMap(x => x.value().split("\\s+")).map(x => ((x, time), 1))
        .reduceByKey((x,y)=>x+y)
          .sortBy(x=>x._2,ascending = false)

      value
    })

7.spark sql +spark streaming transform

object SparkWindowDemo7 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("kafaksource").setMaster("local[2]")
    val streamingContext = new StreamingContext(sparkConf,Seconds(4)) //採集週期2秒
    streamingContext.checkpoint("checkpoint")

    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.195.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup5")
    )

    val kafkaStream:InputDStream[ConsumerRecord[String,String]]
    = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)
    )
    val numStream: DStream[Row] = kafkaStream.transform(rdd => {
      val sQLContext: SQLContext = SQLContextSingletom.getInstance(rdd.sparkContext)
      import sQLContext.implicits._
      val words: RDD[String] = rdd.flatMap(_.value().toString.split("\\s+"))
      val tuple2RDD: RDD[(String, Int)] = words.map((_, 1))
      tuple2RDD.toDF("name", "cn")
        .createOrReplaceTempView("tbwordcount")

      val frame: DataFrame = sQLContext
        .sql("select name,count(cn)from tbwordcount group by name")

      frame.rdd
    })

    numStream.print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}


object  SQLContextSingletom {
  @Transient private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    synchronized(

      if (instance == null) {
        instance = new SQLContext(sparkContext)
      }
    )
    instance
  }

8.列印driver和executor的執行位置,瞭解原理

   //第一種
//    println("driver")
//    val numStream: DStream[String] = kafkaStream.flatMap(line => {
//      println("executor")
//      line.value().toString.split("\\s+")
//    })

    //第二種
//    println("driver") //只執行一次
//    val numStream: DStream[String] = kafkaStream
//      .transform((rdd) => {
//        println("bb") //不管有沒有資料,每個採集週期都會執行一次
//      println(rdd)
//      val value: RDD[String] = rdd.flatMap(
//        x => {
//          println("cc") //只有採集到資料,才會執行
//          x.value().split("\\s+")}
//      )
//      value
//    })


    //第三種
    println("driver")
    kafkaStream.foreachRDD(
      (rdd)=>{
        println("bb")
      rdd.foreach(
        //也可以使用foreachpartition,生產環境裡最好使用這個,但是壞處是容易記憶體溢位,從而宕機
        x=>{
          println("cc")
          val strings: Array[String] = x.value().toString.split("\\s+")
          println(strings.toList)

        })
    }
    )