spark使用window來統計近幾分鐘資料情況
阿新 • • 發佈:2018-12-12
package com.xyf import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object sparkStreamingTest { def main(args: Array[String]){ val conf = new SparkConf() //建立SparkConf物件 conf.setAppName("sparkStreamingTest") //設定應用程式的名稱,在程式執行的監控介面可以看到名稱 conf.setMaster("local[3]") //此時,程式在Spark叢集 /* * 此處設定 Batch Interval 實在spark Streaming 中生成基本Job的單位,視窗和滑動時間間隔 * 一定是該batch Interval的整數倍*/ val ssc = new StreamingContext(conf, Seconds(5)) val hottestStream = ssc.socketTextStream("192.168.0.100", 9999) val searchPair = hottestStream.flatMap(line => line.split(",")).map(item => (item , 1)) //reducefunction計算每個rdd的和,60s是視窗,20是滑動步長 val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1 + v2, Seconds(60) ,Seconds(20)) hottestDStream.transform(hottestItemRDD => { //將pair._2,pair._1反過來,通過數字來排序,然後反轉,最終獲取前三個列印 val top3 = hottestItemRDD.map(pair => (pair._2,pair._1) ).sortByKey(false). map(pair => (pair._2,pair._1)).take(3) for(item <- top3){ println(item) } hottestItemRDD }).print() ssc.start() ssc.awaitTermination() } }
windows環境0:
java scala nc.exe
首先需要開啟埠監聽
D:\> nc -L -p 9999 -v
正在監聽[任何一個(多個)] 9999 ...
連線到 [192.168.0.100] 來自 PC-20161208TLXD [192.168.0.100] 5681
113,edf
112,3fr
12f,ghj
46t,fhgd
qwe,fg
然後啟動程式碼