spark運算元:滑動視窗函式reduceByKeyAndWindow的使用
1.reduceByKeyAndWindow這個運算元也是lazy的,它用來計算一個區間裡面的資料,如下圖:
截圖自官網,例如每個方塊代表5秒鐘,上面的虛線框住的是3個視窗就是15秒鐘,這裡的15秒鐘就是視窗的長度,其中虛線到實線移動了2個方塊表示10秒鐘,這裡的10秒鐘就表示每隔10秒計算一次視窗長度的資料
舉個例子: 如下圖
我是這樣理解的:如果這裡是使用視窗函式計算wordcount 在第一個視窗(虛線視窗)計算出來(aa, 1)(bb,3)(cc,1)當到達時間10秒後窗口移動到實線視窗,就會計算這個實線視窗中的單詞,這裡就為(bb,1)(cc,2)(aa,1)
附上程式:
注意:視窗滑動長度和視窗長度一定要是SparkStreaming微批處理時間的整數倍,不然會報錯.
import org.apache.log4j.Level
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object SparkWindowDemo {
val myfunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map(x => {
(x._1, x._2.sum + x._3.getOrElse( 0))
})
}
def main(args: Array[String]): Unit = {
MyLog.setLogLeavel(Level.WARN)
val conf = new SparkConf().setMaster("local[2]").setAppName("window")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))
sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\myck01" )
val ds = ssc.socketTextStream("192.168.80.123", 9999)
//Seconds(20)表示視窗的寬度 Seconds(10)表示多久滑動一次(滑動的時間長度)
val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10))
// 視窗長度和滑動的長度一致,那麼類似於每次計算自己批量的資料,用updateStateByKey也可以累計計算單詞的wordcount 這裡只是做個是實驗
// val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(4), Seconds(4)).updateStateByKey(myfunc, new HashPartitioner(sc.defaultParallelism), true)
re.print()
ssc.start()
ssc.awaitTermination()
}
}
2.
reduceByKeyAndWindow基於滑動視窗的熱點搜尋詞實時統計
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* 基於滑動視窗的熱點搜尋詞實時統計
* 每隔5秒鐘,統計最近20秒鐘的搜尋詞的搜尋頻次,
* 並打印出排名最靠前的3個搜尋詞以及出現次數
*
*/
object WindowDemo {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("WindowDemo")
.setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(5))
//從nc服務中獲取資料,資料格式:name word,比如:張三 大資料
val linesDStream=ssc.socketTextStream("tgmaster",9999)
//將資料中的搜尋詞取出
val wordsDStream=linesDStream.map(_.split(" ")(1))
//通過map運算元,將搜尋詞形成鍵值對(word,1),將搜尋詞記錄為1次
val searchwordDStream=wordsDStream.map(searchword=>(searchword,1))
//通過reduceByKeyAndWindow運算元,每隔5秒統計最近20秒的搜尋詞出現的次數
val reduceDStream=searchwordDStream.reduceByKeyAndWindow(
(v1:Int,v2:Int)=>
v1+v2,Seconds(20),Seconds(5)
)
//呼叫DStream中的transform運算元,可以進行資料轉換
val transformDStream=reduceDStream.transform(searchwordRDD=>{
val result=searchwordRDD.map(m=>{ //將key與value互換位置
(m._2,m._1)
}).sortByKey(false) //根據key進行降序排列
.map(m=>{ //將key與value互換位置
(m._2,m._1)
}).take(3) //取前3名
for(elem<-result){
println(elem._1+" "+elem._2)
}
searchwordRDD //注意返回值
})
transformDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
4.
最近在玩spark streaming, 感覺到了他的強大。 然後看 StreamingContext的原始碼去理解spark是怎麼完成計算的。 大部分的原始碼比較容易看懂, 但是這個
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
還是花了不少時間。 主要還是由於對spark不熟悉造成的吧, 還好基本弄明白了。
總的來說SparkStreaming提供這個方法主要是出於效率考慮。 比如說我要每10秒計算一下前15秒的內容,(每個batch 5秒), 可以想象每十秒計算出來的結果和前一次計算的結果其實中間有5秒的時間值是重複的。
那麼就是通過如下步驟
1. 儲存上一個window的reduce值
2.計算出上一個window的begin 時間到 重複段的開始時間的reduce 值 =》 oldRDD
3.重複時間段的值結束時間到當前window的結束時間的值 =》 newRDD
4.重複時間段的值等於上一個window的值減去oldRDD
這樣就不需要去計算每個batch的值, 只需加加減減就能得到新的reduce出來的值。
從程式碼上面來看, 入口為:
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
一步一步跟蹤進去, 可以看到實際的業務類是在ReducedWindowedDStream 這個類裡面:
程式碼理解就直接拿這個類來看了: 主要功能是在compute裡面實現, 通過下面程式碼回撥mergeValues 來計算最後的返回值
Scala程式碼
- val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
- .mapValues(mergeValues)
先計算oldRDD 和newRDD
//currentWindow 就是以當前時間回退一個window的時間再向前一個batch 到當前時間的視窗 程式碼裡面有一個圖很有用:
我們要計算的new rdd就是15秒-25秒期間的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值
然後最終結果是 重複區間(previous window的值 - oldRDD的值) =》 也就是中間重複部分, 再加上newRDD的值, 這樣的話得到的結果就是10秒到25秒這個時間區間的值
Scala程式碼
- // 0秒 10秒 15秒 25秒
- // _____________________________
- // | previous window _________|___________________
- // |___________________| current window | --------------> Time
- // |_____________________________|
- //
- // |________ _________| |________ _________|
- // | |
- // V V
- // old RDDs new RDDs
- //
Scala程式碼
- val currentTime = validTime
- val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
- currentTime)
- val previousWindow = currentWindow - slideDuration
- val oldRDDs =
- reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
- logDebug("# old RDDs = " + oldRDDs.size)
- // Get the RDDs of the reduced values in "new time steps"
- val newRDDs =
- reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
- logDebug("# new RDDs = " + newRDDs.size)
得到newRDD和oldRDD後就要拿到previous windows的值: 如果第一次沒有previous window那麼建一個空RDD, 為最後計算結果時 arrayOfValues(0).isEmpty 鋪墊
Java程式碼
- val previousWindowRDD =
- getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))
然後把所有的值放到一個數組裡面 0是previouswindow, 1到oldRDD.size是oldrdd, oldRDD.size到newRDD.size是newrdd
Scala程式碼
- val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
將每個RDD的(K,V) 轉變成(K, Iterator(V))的形式:
比如說有兩個值(K,a) 和(K,b) 那麼coGroup後就會成為(K, Iterator(a,b))這種形式
Java程式碼
- val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
- partitioner)
進行最後的計算:
Scala程式碼
- val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
- ...
- }
首先判斷RDD的value數量是不是正確 previous window因為已經計算過所以只有一組值
正確值為 1 (previous window value) + numOldValues (oldRDD 每個RDD的value) + numNewValues (newRDD 每個RDD的value)
Java程式碼
- if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
- throw new Exception("Unexpected number of sequences of reduced values")
- }
接下來取出oldRDD的值和newRDD的值:
Scala程式碼
- val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
- val newValues =
- (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
如果previous window是空的, 那麼就直接計算newRDD的值(這也是為什麼每次計算時候第一次打出來的值都比較少, 因為他只有newRDD部分沒有重合部分, 也就是隻有10秒的內容而不是15秒)
Scala程式碼
- if (arrayOfValues(0).isEmpty) {
- // If previous window's reduce value does not exist, then at least new values should exist
- if (newValues.isEmpty) {
- throw new Exception("Neither previous window has value for key, nor new values found. " +
- "Are you sure your key class hashes consistently?")
- }
- // Reduce the new values
- newValues.reduce(reduceF) // return
- }
如果有previous window的值, 那麼先存到tempValue, 如果有oldRDD那麼減去oldRDD, 如果有newRDD (一般都有) 那麼加上newRDD的值 這樣就組成上圖裡面10到25秒區間的值了
Scala程式碼
- else {
- // Get the previous window's reduced value
- var tempValue = arrayOfValues(0).head
- // If old values exists, then inverse reduce then from previous value
- if (!oldValues.isEmpty) {
- tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
- }
- // If new values exists, then reduce them with previous value
- if (!newValues.isEmpty) {
- tempValue = reduceF(tempValue, newValues.reduce(reduceF))
- }
- tempValue // return
- }
最後如果有filter的function的話就filter一下:
Scala程式碼
- if (filterFunc.isDefined) {
- Some(mergedValuesRDD.filter(filterFunc.get))
- } else {
- Some(mergedValuesRDD)
- }
參考:
http://humingminghz.iteye.com/blog/2308231
http://www.cnblogs.com/zDanica/p/5471592.html
https://yq.aliyun.com/articles/60315
http://www.medsci.cn/article/show_article.do?id=87c455942b8