Spark Streaming中reduceByKeyAndWindow例項開發
阿新 • • 發佈:2019-02-14
package SparkStreamingTest.Scala
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by TG.
* 每隔2秒鐘,統計最近5秒鐘的搜尋詞中排名最靠前的3個搜尋詞以及出現次數。
*/
object ReduceByKeyAndWindowDemo {
def main(args: Array[String]): Unit = {
//設定日誌級別
Logger.getLogger ("org").setLevel(Level.WARN)
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
//StorageLevel.MEMORY_AND_DISK_SER_2
val linesDStream = ssc.socketTextStream("master", 6666)
//StorageLevel.MEMORY_ONLY_SER
// linesDStream.persist ()
linesDStream.checkpoint(Seconds(10))
linesDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(5), Seconds(2))
.transform(rdd => {
val result: Array[(String, Int)] = rdd.map(x => (x._2, x._1)).sortByKey(false).map (x => (x._2, x._1)).take(3)
//result的型別不是RDD,而是一個Array陣列,此處將其變為RDD
val resultRDD = ssc.sparkContext.parallelize(result)
//注意:transform函式是要有返回值的,所以將操作之後的resultRDD返回。
resultRDD
}).map(x => x._1 + "出現的次數是:" + x._2).print()
ssc.start()
ssc.awaitTermination()
}
}