1. 程式人生 > >spark streaming DStream運算元大全

spark streaming DStream運算元大全

DStream作為spark 流處理的資料抽象,有三個主要的特徵:

1.依賴的DStream的列表

2.DStream生成RDD的時間間隔

3.用來生成RDD的方法

本篇pom.xml檔案spark streaming版本為1.6.0

目錄

window()

reduceByWindow()

countByWindow()

countByValueAndWindow()

reduceByKeyAndWindow()

updatestateByKey()


window()

生成新的DStream

def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
    new WindowedDStream(this, windowDuration, slideDuration)
 }

兩個過載方法,第一個只傳視窗長度(滑動間隔預設為例項化ssc時的時間間隔),第二個傳視窗長度與滑動間隔

val ssc=new StreamingContext(sc,Seconds(1))//時間間隔為1s
val stream=xxxx //非重點,省略

stream.print()
stream.window(Seconds(4),Seconds(4)).print()
stream.window(Seconds(4),Seconds(5)).print()

第一次print():是每秒列印一次這1秒內接收的資料

第二次print():每4秒列印前4秒接收的資料

第三次print():每5秒列印最近4秒接收的資料 ,上個5秒間隔,第一秒內的資料不會列印

reduceByWindow()

生成新的DStream,作用於key-value型別

  def reduceByWindow(
      reduceFunc: (T, T) => T,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[T] = ssc.withScope {
    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
  }

需要傳3個引數,依次為reduce()方法,視窗長度,滑動長度。

該方法的主要過程是:先將時間間隔內的資料呼叫reduce()運算元聚合,然後調window()生成新的DStream,再將各間隔聚合完的結果聚合。

val ssc=new StreamingContext(sc,Seconds(1))//時間間隔
val stream=xxxx //型別為DStream[String]

stream.print()
stream.reduce((s1,s2)=>{
    s1+":"+s2
}).print()
stream.reduceByWindow((s1,s2)=>{
    s1+":"+s2
},Seconds(60),Seconds(10)).print()

第一次print():每秒列印一次接收的資料

第二次print():每秒列印一次,會將每秒接收到的資料拼接成起來

第三次print():每10秒列印一次,列印最近一分鐘接收的資料,並拼接

countByWindow()

生成新的DStream

 def countByWindow(
      windowDuration: Duration,
      slideDuration: Duration): DStream[Long] = ssc.withScope {
    //reduceByWindow()第二各方法_+_為逆函式
    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
  }

需要兩個引數,依次為:視窗長度,滑動間隔

該方法的主要過程為:先將每個元素生成長整數1,然後呼叫reduceByWindow()運算元,將每個元素值相加。

val ssc=new StreamingContext(sc,Seconds(1))//時間間隔為1s
val stream=xxxx

stream.print()
stream.count().print()
stream.countByWindow(Seconds(10),Seconds(2)).print()

第一次print():每秒列印一次接收的資料

第二次print():每秒列印一次接收到元素的數量

第三次print():每2秒列印一次最近10秒接收到元素的數量

countByValueAndWindow()

生成新的DStream

   def countByValueAndWindow(
      windowDuration: Duration,
      slideDuration: Duration,
      numPartitions: Int = ssc.sc.defaultParallelism)
      (implicit ord: Ordering[T] = null)
      : DStream[(T, Long)] = ssc.withScope {
    this.map(x => (x, 1L)).reduceByKeyAndWindow(
      (x: Long, y: Long) => x + y,
      (x: Long, y: Long) => x - y,
      windowDuration,
      slideDuration,
      numPartitions,
      (x: (T, Long)) => x._2 != 0L
    )
  }

需要三個引數,依次為:視窗長度,滑動間隔,分割槽數(有預設值,可不傳)

該方法的主要過程為:先將每個元素生成(元素,1L),然後呼叫reduceByKeyAndWindow(),可以理解為按key聚合,統計每個key的次數,也就是統計每個元素的次數

val ssc=new StreamingContext(sc,Seconds(1))
val stream=xxxx

stream.print()
stream.countByValue().print()
data.countByValueAndWindow(Seconds(10),Seconds(2)).print()

第一次print():每秒列印一次接收的資料

第二次print():每秒列印一次,會統計每個元素的次數  

第三次print():每2秒列印最近10秒的資料,統計每個元素次數

reduceByKeyAndWindow()

生成新的DStream

def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      windowDuration: Duration
    ): DStream[(K, V)] = ssc.withScope {
    reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
  }

該方法有6個過載方法,就不一一粘了,引數為:聚合函式,視窗長度

該方法主要過程為:調reduceByKey()函式

val ssc=new StreamingContext(sc,Seconds(1))
val stream=xxxx  //stream型別為[String,Long]

stream.print()
data.reduceByKey((c1,c2)=>{
    c1+c2
  }).print()
data.reduceByKeyAndWindow((c1,c2)=>{
    c1+c2
  },Seconds(10)).print()

第一次print():每秒列印一次接收的資料

第二次print():每秒列印一次,計算每個key的次數

第三次print():每秒列印一次最近10秒每個key的次數

updatestateByKey()

生成新的DStream

所有的window操作都是計算長度為視窗長度的資料,非window操作都是計算設定的時間間隔內的資料,而updateBykey可以理解成在無限長的時間裡,為每個key儲存一個狀態,這個時間長度可以通過ssc.awaitTerminationOrTimeout()來控制,一般來說長度每天或每小時。

def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)] = ssc.withScope {
    updateStateByKey(updateFunc, defaultPartitioner())
  }

當然,該方法過載方法也6個,這裡只討論上面的,傳入一個更新方法,該方法兩個引數:一個為當前時間間隔內資料,型別為Seq,一個為之前的資料,可能無資料(第一次提交計算的時候),型別為Option,返回值也為Option型別

下面是兩個例項,求pv和uv

//pv
val stream=xxxx//型別得轉化為[當前日期,1L]
stream.updateStateByKey((curvalues:Seq[Long],prevalue:Option[Long])=>{
      val sum=curvalues.sum
      val pre=prevalue.getOrElse(0L)
      Some(sum+pre)
    })
//uv   因為uv涉及到去重,故將userid放入Set裡
val stream=xxxx //型別為[當前日期,userid]
stream.updateStateByKey((curvalues:Seq[Set[String]],prevalue:Option[Set[String]])=>{
      var curs=Set[String]()
      if(!curvalues.isEmpty){
        curs=curvalues.reduce(_++_)//將兩個Set集合合併
      }
      Some(curs++prevalue.getOrElse(Set[String]()))
    })

未完待續。。。。