SparkStreaming視窗操作經典案例
阿新 • • 發佈:2018-12-14
1.背景描述
- 在社交網路(微博),電子商務(京東)、搜尋引擎(百度)、股票交易中人們關心的內容之一是我所關注的內容中,大家正在關注什麼
- 在實際企業中非常有價值
- 例如:我們關注過去30分鐘大家都在熱搜什麼?並且每5分鐘更新一次。要求列出來搜尋前三名的話題內容 2.原理圖 如圖所示,每當視窗滑過DStream時,落在視窗內的源RDD被組合並被執行操作以產生windowed DStream的RDD。在上面的例子中,操作應用於最近3個時間單位的資料,並以2個時間單位滑動。這表明任何視窗操作都需要指定兩個引數。 視窗長度(windowlength) - 視窗的時間長度(上圖的示例中為:15)。 滑動間隔(slidinginterval) - 兩次相鄰的視窗操作的間隔(即每次滑動的時間長度)(上圖示例中為:10)。 這兩個引數必須是源DStream的批間隔的倍數(上圖示例中為:5)。
3.程式碼
問題: * 下述程式碼每隔20秒回重新計算之前60秒內的所有資料,如果視窗滑動時間間隔太短,那麼需要重新計算的資料就比較大,非常耗時 * 怎麼理解呢?視窗滑動時間間隔短的話,與視窗長度的交集每次都必須重新計算,浪費資源,避免交集太大的話就應該設定滑動間隔長一點 * //第一個Seconds是視窗大小(3個RDD一共需要的時間),第二個Seconds是視窗間隔時間 * searchPair.reduceByKeyAndWindow((v1:Int, v2:Int) => v1+v2, (v1:Int, v2:Int) => v1-v2, Seconds(60), Seconds(20)) * object OnlineHotItems { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) //建立StreamingContext物件 val sparkConf = new SparkConf().setAppName("OnlineHotItems").setMaster("local[2]") /** * 此處設定Batch Interval 是在Spark Streaming中生成基本Job的時間單位,視窗和滑動時間間隔必須是是該 * Batch Interval的整數倍,如果不是收集資料的整數倍,就會報錯,因為時間不統一,資料就會出現不完整 */ val ssc = new StreamingContext(sparkConf,Seconds(5)) //建立一個離散流,DStream代表輸入的資料流 val hottestStream = ssc.socketTextStream("hadoop01",1234) /** * 使用者搜尋的格式簡化為item,time 在這裡我們由於要計算出熱點內容,所以只需要取出item即可 * 提取出的item然後通過map轉換為(item,1)格式 */ val searchPair = hottestStream.map(_.split(",")(0)).filter(!_.isEmpty).map(item=>(item,1)) val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int)=>v1+v2,Seconds(60),Seconds(20)) val result: DStream[(String, Int)] = hottestDStream.transform(hottestRDD => { val top3: Array[(String, Int)] = hottestRDD.map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).take(3) ssc.sparkContext.makeRDD(top3) }) result.print() ssc.start() ssc.awaitTermination() } }