spark streaming 之window視窗操作
阿新 • • 發佈:2020-12-24
視窗函式,就是在DStream流上,以一個可配置的長度為視窗,以一個可配置的速率向前移動視窗,根據視窗函式的具體內容,分別對當前視窗中的這一波資料採取某個對應的操作運算元。
需要注意的是視窗長度,和視窗移動速率需要是batch time的整數倍。
1.window(windowLength, slideInterval)
該操作由一個DStream物件呼叫,傳入一個視窗長度引數,一個視窗移動速率引數,然後將當前時刻當前長度視窗中的元素取出形成一個新的DStream。
object SparkWindowDemo {
def main(args: Array[ String]): Unit = {
val sparkConf = new SparkConf().setAppName("kafaksource").setMaster("local[2]")
val streamingContext = new StreamingContext(sparkConf,Seconds(2)) //採集週期2秒
streamingContext.checkpoint("checkpoint")
val kafkaParams = Map(
(ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG -> "192.168.195.20:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer" ),
(ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup2")
)
val kafkaStream:InputDStream[ConsumerRecord[String,String]]
= KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)
)
val numStream: DStream[(String, Int)] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
.map((_, 1)).window(Seconds(8),Seconds(4)) //視窗長度8秒,滑動視窗長度為4秒,且於採集週期為倍數關係
numStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
開啟kafka,生產訊息,進行測試
kafka-console-producer.sh --topic sparkkafkademo --broker-list 192.168.195.20:9092
2. countByWindow(windowLength,slideInterval)
返回指定長度視窗中的元素個數。
注:需要設定checkpoint
object SparkWindowDemo2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("kafaksource").setMaster("local[2]")
val streamingContext = new StreamingContext(sparkConf,Seconds(2)) //採集週期2秒
streamingContext.checkpoint("checkpoint")
val kafkaParams = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.195.20:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup2")
)
val kafkaStream:InputDStream[ConsumerRecord[String,String]]
= KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)
)
val numStream: DStream[Long] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
.map((_, 1))
.countByWindow(Seconds(8), Seconds(4))
//視窗長度8秒,滑動視窗長度為4秒,且於採集週期為倍數關係
numStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
需要用kafka生產訊息進行測試
3. countByValueAndWindow(windowLength,slideInterval, [numTasks])
統計當前時間視窗中元素值相同的元素的個數
注:需要設定checkpoint
省略相同的程式碼(同上)
val numStream: DStream[(String, Long)]
= kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
.countByValueAndWindow(Seconds(8), Seconds(4))
4. reduceByWindow(func, windowLength,slideInterval)
在呼叫DStream上首先取視窗函式的元素形成新的DStream,然後在視窗元素形成的DStream上進行reduce。
val numStream: DStream[String] = kafkaStream
.flatMap(line => line.value().toString.split("\\s+"))
.reduceByWindow(_ + ":" + _, Seconds(8), Seconds(2))
5.reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow的資料來源是基於該DStream的視窗長度中的所有資料進行計算。該操作有一個可選的併發數引數。
val numStream: DStream[(String, Int)] = kafkaStream.flatMap(line => line.value().toString.split("\\s+"))
.map((_, 1))
.reduceByKeyAndWindow((x: Int, y: Int) => {
x + y
}, (x: Int, y: Int) => {
x - y
}, Seconds(8), Seconds(2))
6.transform
val numStream: DStream[((String, String), Int)] = kafkaStream.transform((rdd, timestamp) => {
val format = new SimpleDateFormat("yyyyMMdd HH:mm:ss")
val time: String = format.format(timestamp.milliseconds)
val value: RDD[((String, String), Int)] = rdd
.flatMap(x => x.value().split("\\s+")).map(x => ((x, time), 1))
.reduceByKey((x,y)=>x+y)
.sortBy(x=>x._2,ascending = false)
value
})
7.spark sql +spark streaming transform
object SparkWindowDemo7 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("kafaksource").setMaster("local[2]")
val streamingContext = new StreamingContext(sparkConf,Seconds(4)) //採集週期2秒
streamingContext.checkpoint("checkpoint")
val kafkaParams = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.195.20:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup5")
)
val kafkaStream:InputDStream[ConsumerRecord[String,String]]
= KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)
)
val numStream: DStream[Row] = kafkaStream.transform(rdd => {
val sQLContext: SQLContext = SQLContextSingletom.getInstance(rdd.sparkContext)
import sQLContext.implicits._
val words: RDD[String] = rdd.flatMap(_.value().toString.split("\\s+"))
val tuple2RDD: RDD[(String, Int)] = words.map((_, 1))
tuple2RDD.toDF("name", "cn")
.createOrReplaceTempView("tbwordcount")
val frame: DataFrame = sQLContext
.sql("select name,count(cn)from tbwordcount group by name")
frame.rdd
})
numStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
object SQLContextSingletom {
@Transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
synchronized(
if (instance == null) {
instance = new SQLContext(sparkContext)
}
)
instance
}
8.列印driver和executor的執行位置,瞭解原理
//第一種
// println("driver")
// val numStream: DStream[String] = kafkaStream.flatMap(line => {
// println("executor")
// line.value().toString.split("\\s+")
// })
//第二種
// println("driver") //只執行一次
// val numStream: DStream[String] = kafkaStream
// .transform((rdd) => {
// println("bb") //不管有沒有資料,每個採集週期都會執行一次
// println(rdd)
// val value: RDD[String] = rdd.flatMap(
// x => {
// println("cc") //只有採集到資料,才會執行
// x.value().split("\\s+")}
// )
// value
// })
//第三種
println("driver")
kafkaStream.foreachRDD(
(rdd)=>{
println("bb")
rdd.foreach(
//也可以使用foreachpartition,生產環境裡最好使用這個,但是壞處是容易記憶體溢位,從而宕機
x=>{
println("cc")
val strings: Array[String] = x.value().toString.split("\\s+")
println(strings.toList)
})
}
)