1. 程式人生 > >SparkStreaming視窗操作經典案例

SparkStreaming視窗操作經典案例

1.背景描述

  • 在社交網路(微博),電子商務(京東)、搜尋引擎(百度)、股票交易中人們關心的內容之一是我所關注的內容中,大家正在關注什麼
  • 在實際企業中非常有價值
  • 例如:我們關注過去30分鐘大家都在熱搜什麼?並且每5分鐘更新一次。要求列出來搜尋前三名的話題內容 2.原理圖 在這裡插入圖片描述 如圖所示,每當視窗滑過DStream時,落在視窗內的源RDD被組合並被執行操作以產生windowed DStream的RDD。在上面的例子中,操作應用於最近3個時間單位的資料,並以2個時間單位滑動。這表明任何視窗操作都需要指定兩個引數。  視窗長度(windowlength) - 視窗的時間長度(上圖的示例中為:15)。  滑動間隔(slidinginterval) - 兩次相鄰的視窗操作的間隔(即每次滑動的時間長度)(上圖示例中為:10)。 這兩個引數必須是源DStream的批間隔的倍數(上圖示例中為:5)。

3.程式碼

 問題:
  *       下述程式碼每隔20秒回重新計算之前60秒內的所有資料,如果視窗滑動時間間隔太短,那麼需要重新計算的資料就比較大,非常耗時
  *       怎麼理解呢?視窗滑動時間間隔短的話,與視窗長度的交集每次都必須重新計算,浪費資源,避免交集太大的話就應該設定滑動間隔長一點
  *    //第一個Seconds是視窗大小(3個RDD一共需要的時間),第二個Seconds是視窗間隔時間
  *       searchPair.reduceByKeyAndWindow((v1:Int, v2:Int) => v1+v2, (v1:Int, v2:Int) => v1-v2, Seconds(60), Seconds(20))
  *
object OnlineHotItems {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    //建立StreamingContext物件
    val sparkConf = new SparkConf().setAppName("OnlineHotItems").setMaster("local[2]")
    /**
      * 此處設定Batch Interval 是在Spark Streaming中生成基本Job的時間單位,視窗和滑動時間間隔必須是是該
      * Batch Interval的整數倍,如果不是收集資料的整數倍,就會報錯,因為時間不統一,資料就會出現不完整
      */
    val ssc = new StreamingContext(sparkConf,Seconds(5))
    //建立一個離散流,DStream代表輸入的資料流
    val hottestStream = ssc.socketTextStream("hadoop01",1234)

    /**
      * 使用者搜尋的格式簡化為item,time  在這裡我們由於要計算出熱點內容,所以只需要取出item即可
      * 提取出的item然後通過map轉換為(item,1)格式
      */
    val searchPair = hottestStream.map(_.split(",")(0)).filter(!_.isEmpty).map(item=>(item,1))
    val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int)=>v1+v2,Seconds(60),Seconds(20))
    val result: DStream[(String, Int)] = hottestDStream.transform(hottestRDD => {
      val top3: Array[(String, Int)] = hottestRDD.map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
      ssc.sparkContext.makeRDD(top3)
    })
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}