1. 程式人生 > >理解SparkSteaming視窗函式操作window()

理解SparkSteaming視窗函式操作window()

需求場景:

     一些業務場景,例如網站記錄,每隔1個小時計算最近兩個小時的pv量,還有一種業務場景的話先在記憶體中做累加再更新到redis中做累加,比如說每隔5秒統計最近5秒的資料的總和,再刷到redis中做累加,因為頻繁操作redis的話會存在問題。

重要引數:

1.批處理間隔

2.視窗間隔

3.滑動時間間隔

原理介紹:

       在Spark Streaming中,資料處理是按批進行的,而資料採集是逐條進行的,因此在Spark Streaming中會先設定好批處理間隔(batch duration),

當超過批處理間隔的時候就會把採集到的資料彙總起來成為一批資料交給系統去處理。

對於視窗操作而言,在其視窗內部會有N個批處理資料,批處理資料的大小由視窗間隔(windowduration)決定,

而視窗間隔指的就是視窗的持續時間,在視窗操作中,只有視窗的長度滿足了才會觸發批資料的處理。

除了視窗的長度,視窗操作還有另一個重要的引數就是滑動間隔(slide duration),它指的是經過多長時間視窗滑動一次形成新的視窗,滑動視窗預設情況下和批次間隔的相同,而視窗間隔一般設定的要比它們兩個大。在這裡必須注意的一點是滑動間隔和視窗間隔的大小一定得設定為批處理間隔的整數倍。

如批處理間隔示意圖所示,批處理間隔是1個時間單位,視窗間隔是3個時間單位,滑動間隔是2個時間單位。對於初始的視窗time 1-time 3,只有視窗間隔滿足了才觸發資料的處理。這裡需要注意的一點是,初始的視窗有可能流入的資料沒有撐滿,但是隨著時間的推進,視窗最終會被撐滿。當每個2個時間單位,視窗滑動一次後,會有新的資料流入視窗,這時視窗會移去最早的兩個時間單位的資料,而與最新的兩個時間單位的資料進行彙總形成新的視窗(time3-time5)。

對於視窗操作,批處理間隔、視窗間隔和滑動間隔是非常重要的三個時間概念,是理解視窗操作的關鍵所在。

舉例:

如上圖顯示,視窗在源 DStream 上 slides(滑動),合併和操作落入窗內的源 RDDs,產生視窗化的 DStream 的 RDDs。在這個具體的例子中,程式在三個時間單元的資料上進行視窗操作,並且每兩個時間單元滑動一次。 這說明,任何一個視窗操作都需要指定兩個引數.

window length(視窗長度) - 視窗的持續時間(圖 3).

sliding interval(滑動間隔) - 執行視窗操作的間隔(圖 2).

這兩個引數必須是 source DStream 的 batch interval(批間隔)的倍數(圖 1).

讓我們舉例以說明視窗操作. 例如,你想擴充套件前面的例子用來計算過去 30 秒的詞頻,間隔時間是 10 秒. 為了達到這個目的,我們必須在過去 30 秒的 (wrod, 1) pairs 的 pairs DStream 上應用 reduceByKey 操作. 用方法 reduceByKeyAndWindow 實現.

// Reduce last 30 seconds of data, every 10 seconds

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

上圖的time 1處理之前的30秒的資料,time3 處理之前time1到time3共30秒的資料,依次類推