1. 程式人生 > >spark運算元:滑動視窗函式reduceByKeyAndWindow的使用

spark運算元:滑動視窗函式reduceByKeyAndWindow的使用

1.reduceByKeyAndWindow這個運算元也是lazy的,它用來計算一個區間裡面的資料,如下圖:

這裡寫圖片描述

截圖自官網,例如每個方塊代表5秒鐘,上面的虛線框住的是3個視窗就是15秒鐘,這裡的15秒鐘就是視窗的長度,其中虛線到實線移動了2個方塊表示10秒鐘,這裡的10秒鐘就表示每隔10秒計算一次視窗長度的資料

舉個例子: 如下圖

這裡寫圖片描述

我是這樣理解的:如果這裡是使用視窗函式計算wordcount 在第一個視窗(虛線視窗)計算出來(aa, 1)(bb,3)(cc,1)當到達時間10秒後窗口移動到實線視窗,就會計算這個實線視窗中的單詞,這裡就為(bb,1)(cc,2)(aa,1)

附上程式:

注意:視窗滑動長度和視窗長度一定要是SparkStreaming微批處理時間的整數倍,不然會報錯.

import org.apache.log4j.Level
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object SparkWindowDemo {

  val myfunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
    it.map(x => {
      (x._1, x._2.sum + x._3.getOrElse(
0)) }) } def main(args: Array[String]): Unit = { MyLog.setLogLeavel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("window") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(2)) sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\myck01"
) val ds = ssc.socketTextStream("192.168.80.123", 9999) //Seconds(20)表示視窗的寬度 Seconds(10)表示多久滑動一次(滑動的時間長度) val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10)) // 視窗長度和滑動的長度一致,那麼類似於每次計算自己批量的資料,用updateStateByKey也可以累計計算單詞的wordcount 這裡只是做個是實驗 // val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(4), Seconds(4)).updateStateByKey(myfunc, new HashPartitioner(sc.defaultParallelism), true) re.print() ssc.start() ssc.awaitTermination() } }

2.

reduceByKeyAndWindow基於滑動視窗的熱點搜尋詞實時統計


import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
  *
  * 基於滑動視窗的熱點搜尋詞實時統計
  * 每隔5秒鐘,統計最近20秒鐘的搜尋詞的搜尋頻次,
  * 並打印出排名最靠前的3個搜尋詞以及出現次數
  *
  */
object WindowDemo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("WindowDemo")
                .setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(5))


    //從nc服務中獲取資料,資料格式:name word,比如:張三 大資料
    val linesDStream=ssc.socketTextStream("tgmaster",9999)
    //將資料中的搜尋詞取出
    val wordsDStream=linesDStream.map(_.split(" ")(1))
    //通過map運算元,將搜尋詞形成鍵值對(word,1),將搜尋詞記錄為1次
    val searchwordDStream=wordsDStream.map(searchword=>(searchword,1))
    //通過reduceByKeyAndWindow運算元,每隔5秒統計最近20秒的搜尋詞出現的次數
    val reduceDStream=searchwordDStream.reduceByKeyAndWindow(
      (v1:Int,v2:Int)=>
        v1+v2,Seconds(20),Seconds(5)
    )
    //呼叫DStream中的transform運算元,可以進行資料轉換
    val transformDStream=reduceDStream.transform(searchwordRDD=>{
      val result=searchwordRDD.map(m=>{  //將key與value互換位置
        (m._2,m._1)
      }).sortByKey(false) //根據key進行降序排列
        .map(m=>{ //將key與value互換位置
        (m._2,m._1)
      }).take(3) //取前3名


      for(elem<-result){
        println(elem._1+"  "+elem._2)
      }
      searchwordRDD //注意返回值
    })
    transformDStream.print()


    ssc.start()
    ssc.awaitTermination()
  }

}

4.

最近在玩spark streaming, 感覺到了他的強大。 然後看 StreamingContext的原始碼去理解spark是怎麼完成計算的。 大部分的原始碼比較容易看懂, 但是這個 
reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 
還是花了不少時間。 主要還是由於對spark不熟悉造成的吧, 還好基本弄明白了。 

總的來說SparkStreaming提供這個方法主要是出於效率考慮。 比如說我要每10秒計算一下前15秒的內容,(每個batch 5秒), 可以想象每十秒計算出來的結果和前一次計算的結果其實中間有5秒的時間值是重複的。 
那麼就是通過如下步驟 
1. 儲存上一個window的reduce值 
2.計算出上一個window的begin 時間到 重複段的開始時間的reduce 值 =》 oldRDD 
3.重複時間段的值結束時間到當前window的結束時間的值 =》 newRDD 
4.重複時間段的值等於上一個window的值減去oldRDD 

這樣就不需要去計算每個batch的值, 只需加加減減就能得到新的reduce出來的值。 

從程式碼上面來看, 入口為: 
reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 

一步一步跟蹤進去, 可以看到實際的業務類是在ReducedWindowedDStream 這個類裡面: 
程式碼理解就直接拿這個類來看了: 主要功能是在compute裡面實現, 通過下面程式碼回撥mergeValues 來計算最後的返回值 

Scala程式碼   收藏程式碼
  1. val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]  
  2.       .mapValues(mergeValues)  


先計算oldRDD 和newRDD 

//currentWindow  就是以當前時間回退一個window的時間再向前一個batch 到當前時間的視窗 程式碼裡面有一個圖很有用: 
我們要計算的new rdd就是15秒-25秒期間的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值 

然後最終結果是 重複區間(previous window的值 - oldRDD的值) =》 也就是中間重複部分, 再加上newRDD的值, 這樣的話得到的結果就是10秒到25秒這個時間區間的值 

Scala程式碼   收藏程式碼
  1. // 0秒                  10秒     15秒                25秒  
  2. //  _____________________________  
  3. // |  previous window   _________|___________________  
  4. // |___________________|       current window        |  --------------> Time  
  5. //                     |_____________________________|  
  6. //  
  7. // |________ _________|          |________ _________|  
  8. //          |                             |  
  9. //          V                             V  
  10. //       old RDDs                     new RDDs  
  11. //  




Scala程式碼   收藏程式碼
  1. val currentTime = validTime  
  2.   
  3.   
  4.     val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,  
  5.       currentTime)  
  6.     val previousWindow = currentWindow - slideDuration  
  7.   
  8.    val oldRDDs =  
  9.       reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)  
  10.     logDebug("# old RDDs = " + oldRDDs.size)  
  11.   
  12.     // Get the RDDs of the reduced values in "new time steps"  
  13.     val newRDDs =  
  14.       reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)  
  15.     logDebug("# new RDDs = " + newRDDs.size)  


得到newRDD和oldRDD後就要拿到previous windows的值: 如果第一次沒有previous window那麼建一個空RDD, 為最後計算結果時 arrayOfValues(0).isEmpty 鋪墊 
Java程式碼   收藏程式碼
  1. val previousWindowRDD =  
  2.       getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))  


然後把所有的值放到一個數組裡面 0是previouswindow, 1到oldRDD.size是oldrdd, oldRDD.size到newRDD.size是newrdd 

Scala程式碼   收藏程式碼
  1. val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs  


將每個RDD的(K,V) 轉變成(K, Iterator(V))的形式: 

比如說有兩個值(K,a) 和(K,b) 那麼coGroup後就會成為(K, Iterator(a,b))這種形式 

Java程式碼   收藏程式碼
  1. val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],  
  2.      partitioner)  


進行最後的計算: 
Scala程式碼   收藏程式碼
  1.  val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {  
  2. ...  
  3.   
  4. }  


首先判斷RDD的value數量是不是正確 previous window因為已經計算過所以只有一組值 
正確值為 1 (previous window value) + numOldValues (oldRDD 每個RDD的value) + numNewValues (newRDD 每個RDD的value) 

Java程式碼   收藏程式碼
  1. if (arrayOfValues.size != 1 + numOldValues + numNewValues) {  
  2.   throw new Exception("Unexpected number of sequences of reduced values")  
  3. }  


接下來取出oldRDD的值和newRDD的值: 
Scala程式碼   收藏程式碼
  1. val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)  
  2. val newValues =  
  3.        (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)  


如果previous window是空的, 那麼就直接計算newRDD的值(這也是為什麼每次計算時候第一次打出來的值都比較少, 因為他只有newRDD部分沒有重合部分, 也就是隻有10秒的內容而不是15秒) 

Scala程式碼   收藏程式碼
  1. if (arrayOfValues(0).isEmpty) {  
  2.        // If previous window's reduce value does not exist, then at least new values should exist  
  3.        if (newValues.isEmpty) {  
  4.          throw new Exception("Neither previous window has value for key, nor new values found. " +  
  5.            "Are you sure your key class hashes consistently?")  
  6.        }  
  7.        // Reduce the new values  
  8.        newValues.reduce(reduceF) // return  
  9.      }  


如果有previous window的值, 那麼先存到tempValue, 如果有oldRDD那麼減去oldRDD, 如果有newRDD (一般都有) 那麼加上newRDD的值 這樣就組成上圖裡面10到25秒區間的值了 

Scala程式碼   收藏程式碼
  1. else {  
  2.         // Get the previous window's reduced value  
  3.         var tempValue = arrayOfValues(0).head  
  4.         // If old values exists, then inverse reduce then from previous value  
  5.         if (!oldValues.isEmpty) {  
  6.           tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))  
  7.         }  
  8.         // If new values exists, then reduce them with previous value  
  9.         if (!newValues.isEmpty) {  
  10.           tempValue = reduceF(tempValue, newValues.reduce(reduceF))  
  11.         }  
  12.         tempValue // return  
  13.       }  


最後如果有filter的function的話就filter一下: 
Scala程式碼   收藏程式碼
  1. if (filterFunc.isDefined) {  
  2.       Some(mergedValuesRDD.filter(filterFunc.get))  
  3.     } else {  
  4.       Some(mergedValuesRDD)  
  5.     }  

參考:

http://humingminghz.iteye.com/blog/2308231

http://www.cnblogs.com/zDanica/p/5471592.html

https://yq.aliyun.com/articles/60315

http://www.medsci.cn/article/show_article.do?id=87c455942b8