Spark Streaming 滑動視窗
Spark Streaming提供了滑動視窗操作的支援,從而讓我們可以對一個滑動視窗內的資料執行計算操作。每次掉落在視窗內的RDD的資料,會被聚合起來執行計算操作,然後生成的RDD,會作為window DStream的一個RDD。
網官圖中所示,就是對每三秒鐘的資料執行一次滑動視窗計算,這3秒內的3個RDD會被聚合起來進行處理,然後過了兩秒鐘,又會對最近三秒內的資料執行滑動視窗計算。所以每個滑動視窗操作,都必須指定兩個引數,視窗長度以及滑動間隔,而且這兩個引數值都必須是batch間隔的整數倍。
Spark Streaming對滑動視窗的支援,是比Storm更加完善和強大的。
之前有些朋友問:
spark官網圖片中: 滑動視窗寬度是3個時間單位,滑動時間是2兩個單位,這樣的話中間time3的Dstream不是重複計算了嗎?
Answer:比如下面這個例子是針對熱搜的應用場景,官方的例子也可能是是針對不同的場景給出了的。如果你不想出現重疊的部分,把滑動間隔由2改成3即可
SparkStreaming對滑動視窗支援的轉換操作:
示例講解:
1、window(windowLength, slideInterval)
該操作由一個DStream物件呼叫,傳入一個視窗長度引數,一個視窗移動速率引數,然後將當前時刻當前長度視窗中的元素取出形成一個新的DStream。
下面的程式碼以長度為3,移動速率為1擷取源DStream中的元素形成新的DStream。
val windowWords = words.window(Seconds( 3 ), Seconds( 1))
基本上每秒輸入一個字母,然後取出當前時刻3秒這個長度中的所有元素,打印出來。從上面的截圖中可以看到,下一秒時已經看不到a了,再下一秒,已經看不到b和c了。表示a, b, c已經不在當前的視窗中。
2、 countByWindow(windowLength,slideInterval)
返回指定長度視窗中的元素個數。
程式碼如下,統計當前3秒長度的時間視窗的DStream中元素的個數:
val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1))
3、 reduceByWindow(func, windowLength,slideInterval)
類似於上面的reduce操作,只不過這裡不再是對整個呼叫DStream進行reduce操作,而是在呼叫DStream上首先取視窗函式的元素形成新的DStream,然後在視窗元素形成的DStream上進行reduce。
val windowWords = words.reduceByWindow(_ + "-" + _, Seconds( 3) , Seconds( 1 ))
4、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])
呼叫該操作的DStream中的元素格式為(k, v),整個操作類似於前面的reduceByKey,只不過對應的資料來源不同,reduceByKeyAndWindow的資料來源是基於該DStream的視窗長度中的所有資料。該操作也有一個可選的併發數引數。
下面程式碼中,將當前長度為3的時間視窗中的所有資料元素根據key進行合併,統計當前3秒中內不同單詞出現的次數。
val windowWords = pairs.reduceByKeyAndWindow((a:Int , b:Int) => (a + b) , Seconds(3 ) , Seconds( 1 ))
5、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])
這個視窗操作和上一個的區別是多傳入一個函式invFunc。前面的func作用和上一個reduceByKeyAndWindow相同,後面的invFunc是用於處理流出rdd的。
在下面這個例子中,如果把3秒的時間視窗當成一個池塘,池塘每一秒都會有魚遊進或者游出,那麼第一個函式表示每由進來一條魚,就在該類魚的數量上累加。而第二個函式是,每由出去一條魚,就將該魚的總數減去一。
val windowWords = pairs.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b) , Seconds( 3 ), Seconds( 1 ))
下面是演示結果,最終的結果是該3秒長度的視窗中歷史上出現過的所有不同單詞個數都為0。
段時間不輸入任何資訊,看一下最終結果
6、 countByValueAndWindow(windowLength,slideInterval, [numTasks])
類似於前面的countByValue操作,呼叫該操作的DStream資料格式為(K, v),返回的DStream格式為(K, Long)。統計當前時間視窗中元素值相同的元素的個數。
val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1))
示例二:熱點搜尋詞滑動統計,每隔10秒鐘,統計最近60秒鐘的搜尋詞的搜尋頻次,並打印出排名最靠前的3個搜尋詞以及出現次數
Scala版本:
package
com.spark.streaming
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
/**
* @author Ganymede
*/
object WindowHotWordS {
def main(args: Array[String]): Unit = {
val conf = new
SparkConf().setAppName("WindowHotWordS").setMaster("local[2]")
//Scala中,建立的是StreamingContext
val ssc = new StreamingContext(conf,
Seconds(5))
val searchLogsDStream =
ssc.socketTextStream("spark1", 9999)
val searchWordsDStream =
searchLogsDStream.map { searchLog => searchLog.split(" ")(1)
}
val searchWordPairDStream = searchWordsDStream.map
{ searchWord => (searchWord, 1) }
// reduceByKeyAndWindow
// 第二個引數,是視窗長度,這是是60秒
// 第三個引數,是滑動間隔,這裡是10秒
// 也就是說,每隔10秒鐘,將最近60秒的資料,作為一個視窗,進行內部的RDD的聚合,然後統一對一個RDD進行後續計算
// 而是隻是放在那裡
// 然後,等待我們的滑動間隔到了以後,10秒到了,會將之前60秒的RDD,因為一個batch間隔是5秒,所以之前60秒,就有12個RDD,給聚合起來,然後統一執行reduceByKey操作
// 所以這裡的reduceByKeyAndWindow,是針對每個視窗執行計算的,而不是針對 某個DStream中的RDD
// 每隔10秒鐘,出來之前60秒的收集到的單詞的統計次數
val searchWordCountsDStream =
searchWordPairDStream.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2,
Seconds(60), Seconds(10))
val finalDStream =
searchWordCountsDStream.transform(searchWordCountsRDD => {
val countSearchWordsRDD =
searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))
val sortedCountSearchWordsRDD =
countSearchWordsRDD.sortByKey(false)
val sortedSearchWordCountsRDD =
sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))
val top3SearchWordCounts =
sortedSearchWordCountsRDD.take(3)
for (tuple <-
top3SearchWordCounts) {
println("result : " +
tuple)
}
searchWordCountsRDD
})
finalDStream.print()
ssc.start()
ssc.awaitTermination()
}
}