大數據學習日誌——粗看sparkstreaming滑動窗口源碼
寫這篇隨筆的原因在於本人在網上看了很多相關博客很多文章內容給出的用法都一致是如下形式:
1 reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)
但是詳細描述函數的各個參數怎麽使用,為什麽要怎麽寫,可以怎麽修改參數的文章基本沒看到。於是便想著自己動手豐衣足食,從源碼粗略看起來,這個滑動窗口到底怎麽用!spark2.4版本
本內容主要說明滑動窗口對於丟出去的數據批次和新來的數據批次以及共同的數據批次源碼粗看和講解
1 如何使用reduceByKeyAndWindow
首先看看官網提供的圖片:
眾所周知sparkSteaming是隔一段時間將一部分數據聚合成一個批次然後處理,如上圖中一個綠框就是一個批次的數據集。
再看看源碼內容:
1 /** 2 * Return a new DStream by applying incremental `reduceByKey` over a sliding window. 3 * The reduced value of over a new window is calculated using the old window‘s reduced value : 4 * 1. reduce the new values that entered the window (e.g., adding new counts)5 * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) 6 * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. 7 * However, it is applicable to only "invertible reduce functions". 8 * @param reduceFunc associative andcommutative reduce function 9 * @param invReduceFunc inverse reduce function 10 * @param windowDuration width of the window; must be a multiple of this DStream‘s 11 * batching interval 12 * @param slideDuration sliding interval of the window (i.e., the interval after which 13 * the new DStream will generate RDDs); must be a multiple of this 14 * DStream‘s batching interval 15 * @param partitioner partitioner for controlling the partitioning of each RDD in the new 16 * DStream. 17 * @param filterFunc Optional function to filter expired key-value pairs; 18 * only pairs that satisfy the function are retained 19 */ 20 def reduceByKeyAndWindow( 21 reduceFunc: (V, V) => V, 22 invReduceFunc: (V, V) => V, 23 windowDuration: Duration, 24 slideDuration: Duration, 25 partitioner: Partitioner, 26 filterFunc: ((K, V)) => Boolean 27 ): DStream[(K, V)] = ssc.withScope { 28 29 val cleanedReduceFunc = ssc.sc.clean(reduceFunc) 30 val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) 31 val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None 32 new ReducedWindowedDStream[K, V]( 33 self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, 34 windowDuration, slideDuration, partitioner 35 ) 36 }
一共需要6個參數,實際運算中有相關重載方法,一些參數會有默認值,這裏是最底層源函數。第5、6個參數分別是分區和過濾參數,這裏不討論,根據實際運用
1.1窗口大小和滑動長度
windowDuration為窗口大小參數,slideDuration為滑動長度參數。
一下以time表示每個批次時間長度,即每過一個time事件傳來一個批次的數據,就以上圖為例詳細描述窗口大小和滑動長度的內容:
A.窗口大小為批次3time,上圖可見window at time 3 ,wimdow at time 5都包含三個批次的數據集即窗口每次能對3time事件的數據進行運算
B.滑動事件長度為2time,上圖可見window at time 1、window at time 3,window at time 5都相隔2time,並且window at time 3 和window at time 5 所共有的批次只有一塊即1time時間內的數據,已知每個窗口還有三個3time的數據,於是相隔批次便把前2time的數據集去掉再加上2time數據。
根據上圖所示,可以想象滑動窗口的大小和滑動長度都是以批次為最小單位,即設置窗口大小和滑動長度時間時候必須是設置的批次事件的整數倍,否則會報錯;若是在初始時,進入窗口的批次並沒有填滿窗口,則窗口並不會對已經進入的批次進行計算。本篇不對這裏進行詳細說明。
1.2reduce和invReduce
1.1內容說明了窗口處理批次的時間間隔和每次處理的數據批次大小,這裏說明窗口是怎麽對進出批次進行處理,窗口中的數據怎麽運算。
考慮以下事情,若是窗口每次滑動一次,就對窗口中的所有數據全都做一次運算,這樣的實現似乎並不難,但是要是本次窗口的數據和上次窗口的數據有重復,要是每次都全部進行計算,是會對已經計算過的數據再進行一次計算,所以每次窗口都全部計算所有數據的方式時可以優化的,要是存在重復數據的情況下,只需要保留上次計算的重復數據計算值,然後減去上次運算多余的計算值,再加上本次運算新加入的數據計算值,於是reduceByKeyAndWindow便提供了如此的實現方式:
reduceFunc參數是窗口數據的處理函數,這是最基本的,通過這個函數獲得想要的每個批次運算值。
invReduceFunc參數時窗口數據減去上一次運算多余的數據值函數。
2 滑動窗口數據處理過程
現在知道了reduceFunc是用來對窗口數據進行運算的,那麽這個運算是怎麽對相鄰窗口運算優化的、invReduceFunc是怎麽減去上次運算多余數據的,一下便從源碼探究。
從reduceByKeyAndWindow函數中可見,此函數最終調用是new ReducedWindowedDStream這個class,一起進入這個函數看看:
1 class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( 2 parent: DStream[(K, V)], 3 reduceFunc: (V, V) => V, 4 invReduceFunc: (V, V) => V, 5 filterFunc: Option[((K, V)) => Boolean], 6 _windowDuration: Duration, 7 _slideDuration: Duration, 8 partitioner: Partitioner 9 ) extends DStream[(K, V)](parent.ssc)
著重關註三個參數parent是這個窗口的DStream,reudceFunc、invReduceFunc同上,不在描述,
然後到compute函數中看看,為避免篇幅過長,只看部分關鍵代碼。
源碼中給出了一張圖,描述了內部運算中分出的運算對象:
// _____________________________
// | previous window _________|___________________
// |___________________| current window | --------------> Time
// |_____________________________|
//
// |________ _________| |________ _________|
// | |
// V V
// old RDDs new RDDs
//
從代碼中看看那幾個關鍵對象,從代碼中摘出
val currentTime = validTime val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) val previousWindow = currentWindow - slideDuration // Get the RDDs of the reduced values in "old time steps" val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) logDebug("# old RDDs = " + oldRDDs.size) // Get the RDDs of the reduced values in "new time steps" val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime) logDebug("# new RDDs = " + newRDDs.size) // Get the RDD of the reduced value of the previous window val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]())) // Make the list of RDDs that needs to cogrouped together for reducing their reduced values val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs // Cogroup the reduced RDDs and merge the reduced values val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
這裏說明一下,只需要看參數名和上圖的對應關系即可,本身計算變量的過程和上圖不怎麽對應得上,無論currentTime是哪個時間的值,最終的參數currentWIndow並不是上圖表示的currentWindow而是一個間隔為兩次運算重復的時間長度,然後通過這個變量運算的oldRDDs、newRDDs反而是對的。個人猜測可能時開發人員初期確實時想做成currentTime這樣的理解,但是實際運算並不理想比如currentTime值並不是那麽的在標準時間上還是其他原因,為了更通用,於是改了時間邏輯,但是變量名沒變。如有錯誤,歡迎指出一起學習!
現在重點看cogroupedRDD,這是把allRDDs轉為cogroupedRDD,之後的計算過程都是通過cogroupedRDD操作。然後還有兩個參數也需要註意:
1 val numOldValues = oldRDDs.size
2 val numNewValues = newRDDs.size
先看看這個數據時怎麽處理的:
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
.mapValues(mergeValues)
即把cogroupedRDD調用mergeValues運算,mapValues相當於數據整理後分區map運算,這裏不深入
然後是關鍵的運算部分,在這一部分不同版本的spark源碼會大有不同:
1 val mergeValues = (arrayOfValues: Array[Iterable[V]]) => { 2 if (arrayOfValues.length != 1 + numOldValues + numNewValues) { 3 throw new Exception("Unexpected number of sequences of reduced values") 4 } 5 // Getting reduced values "old time steps" that will be removed from current window 6 val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head) 7 // Getting reduced values "new time steps" 8 val newValues = 9 (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) 10 11 if (arrayOfValues(0).isEmpty) { 12 // If previous window‘s reduce value does not exist, then at least new values should exist 13 if (newValues.isEmpty) { 14 throw new Exception("Neither previous window has value for key, nor new values found. " + 15 "Are you sure your key class hashes consistently?") 16 } 17 // Reduce the new values 18 newValues.reduce(reduceF) // return 19 } else { 20 // Get the previous window‘s reduced value 21 var tempValue = arrayOfValues(0).head 22 // If old values exists, then inverse reduce then from previous value 23 if (!oldValues.isEmpty) { 24 tempValue = invReduceF(tempValue, oldValues.reduce(reduceF)) 25 } 26 // If new values exists, then reduce them with previous value 27 if (!newValues.isEmpty) { 28 tempValue = reduceF(tempValue, newValues.reduce(reduceF)) 29 } 30 tempValue // return 31 } 32 }
oldValues把oldRDDs的數據清理一遍拿出來,newValues同理。
這裏關註源碼中早就定義的兩個變量:
1 val reduceF = reduceFunc
2 val invReduceF = invReduceFunc
reduceF即要對窗口數據處理的函數,invReduceF對上一次窗口數據多余部分的處理函數
arrayOfValues(0)根據allRDDs知道這個部分是previousWindowRDD
arrayOfValues.isEmpty若為真,即沒有上一次滑動窗口數據即對新數據進行reduce(reduceF)運算即可完成,reduce函數就是調用參數對本身數據集元素處理的函數,這裏不詳細說明,源碼可見相關內容。
arrayOfValues.isEmpty若為假,即需要處理上次窗口多余部分數據,將之取出為tempValue
當oldRDDs數據存在則通過invReduceF函數處理上次窗口函數和oldRDDs的reduceF運算後的數據賦值給tempValue
當newRDDs數據存在則通過reduceF函數處理tempValue和newRDDs自身reduceF運算後的值。
這裏便把源碼計算過程看完了。
3 總結
A.窗口大小和滑動長度都必須是批次時間的整數倍
B.reduceFunc函數必須滿足結合律,單獨數據之間的運算,多個單獨數據的運算緩存值和單個數據間的運算以及多個多個單獨數據的運算緩存值之間的運算,無論怎麽組合都保持一致性
C.invReduceFunc函數必須是reduceFunc逆元運算過程
大白話講reduceFunc、invReduceFunc兩個函數,reduceFunc要是加法、乘法運算,那麽invReduceFunc就得是減法,除法運算;若reduceFunc要是求平均值運算,若只給出一個平均值數值,就會出問題,若要實現必須給出數據數量,為避免平均值省略數位誤差的情況最好再給出總數據值大小,甚至可以在外部代碼中計算平均值,reduceFunc函數主要計算總數值大小以及數據數量。
大數據學習日誌——粗看sparkstreaming滑動窗口源碼