SparkStreaming視窗函式的應用
阿新 • • 發佈:2018-12-27
package windon import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 背景描述 * 在社交網(微博),電子商務(淘寶),搜尋引擎(百度),股票交易中人們最關係的內容 * 大家在某段時間中關注的是什麼 * 這種資料,在企業中非常有價值 * * 例如:我們關注的過去30分鐘大家的熱搜是什麼? * 需求:我們需要沒20秒鐘統計一次過去60秒的熱詞 * 資料格式: hadoop 20181224 * 返回格式要求: 熱詞排行前三 */ object OnlineHotItem { def main(args: Array[String]): Unit = { //過濾日誌 Logger.getLogger("org").setLevel(Level.WARN) //入口 val conf = new SparkConf().setAppName("1").setMaster("local[*]") val ssc = new StreamingContext(conf,Seconds(5)) //拉取資料 val hosItemDStream = ssc.socketTextStream("hadoop01",1234) //對資料進行處理,得到想要的資料 val serachPair = hosItemDStream.map(_.split(" ")(0)).filter(!_.isEmpty).map((_,1)) //運用視窗函式,第二個引數為視窗長度,第三個引數為視窗滑動間隔 val hotDStream = serachPair.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(60),Seconds(20)) //利用transform運算元獲取前三的排序熱詞 val result= hotDStream.transform(rdd=>{ val top3 = rdd.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).take(3) ssc.sparkContext.makeRDD(top3) }) result.print() ssc.start() ssc.awaitTermination() } }