spark streaming DStream運算元大全
DStream作為spark 流處理的資料抽象,有三個主要的特徵:
1.依賴的DStream的列表
2.DStream生成RDD的時間間隔
3.用來生成RDD的方法
本篇pom.xml檔案spark streaming版本為1.6.0
目錄
window()
生成新的DStream
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
new WindowedDStream(this, windowDuration, slideDuration)
}
兩個過載方法,第一個只傳視窗長度(滑動間隔預設為例項化ssc時的時間間隔),第二個傳視窗長度與滑動間隔
val ssc=new StreamingContext(sc,Seconds(1))//時間間隔為1s val stream=xxxx //非重點,省略 stream.print() stream.window(Seconds(4),Seconds(4)).print() stream.window(Seconds(4),Seconds(5)).print()
第一次print():是每秒列印一次這1秒內接收的資料
第二次print():每4秒列印前4秒接收的資料
第三次print():每5秒列印最近4秒接收的資料 ,上個5秒間隔,第一秒內的資料不會列印
reduceByWindow()
生成新的DStream,作用於key-value型別
def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) }
需要傳3個引數,依次為reduce()方法,視窗長度,滑動長度。
該方法的主要過程是:先將時間間隔內的資料呼叫reduce()運算元聚合,然後調window()生成新的DStream,再將各間隔聚合完的結果聚合。
val ssc=new StreamingContext(sc,Seconds(1))//時間間隔
val stream=xxxx //型別為DStream[String]
stream.print()
stream.reduce((s1,s2)=>{
s1+":"+s2
}).print()
stream.reduceByWindow((s1,s2)=>{
s1+":"+s2
},Seconds(60),Seconds(10)).print()
第一次print():每秒列印一次接收的資料
第二次print():每秒列印一次,會將每秒接收到的資料拼接成起來
第三次print():每10秒列印一次,列印最近一分鐘接收的資料,並拼接
countByWindow()
生成新的DStream
def countByWindow(
windowDuration: Duration,
slideDuration: Duration): DStream[Long] = ssc.withScope {
//reduceByWindow()第二各方法_+_為逆函式
this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
需要兩個引數,依次為:視窗長度,滑動間隔
該方法的主要過程為:先將每個元素生成長整數1,然後呼叫reduceByWindow()運算元,將每個元素值相加。
val ssc=new StreamingContext(sc,Seconds(1))//時間間隔為1s
val stream=xxxx
stream.print()
stream.count().print()
stream.countByWindow(Seconds(10),Seconds(2)).print()
第一次print():每秒列印一次接收的資料
第二次print():每秒列印一次接收到元素的數量
第三次print():每2秒列印一次最近10秒接收到元素的數量
countByValueAndWindow()
生成新的DStream
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism)
(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
windowDuration,
slideDuration,
numPartitions,
(x: (T, Long)) => x._2 != 0L
)
}
需要三個引數,依次為:視窗長度,滑動間隔,分割槽數(有預設值,可不傳)
該方法的主要過程為:先將每個元素生成(元素,1L),然後呼叫reduceByKeyAndWindow(),可以理解為按key聚合,統計每個key的次數,也就是統計每個元素的次數
val ssc=new StreamingContext(sc,Seconds(1))
val stream=xxxx
stream.print()
stream.countByValue().print()
data.countByValueAndWindow(Seconds(10),Seconds(2)).print()
第一次print():每秒列印一次接收的資料
第二次print():每秒列印一次,會統計每個元素的次數
第三次print():每2秒列印最近10秒的資料,統計每個元素次數
reduceByKeyAndWindow()
生成新的DStream
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
}
該方法有6個過載方法,就不一一粘了,引數為:聚合函式,視窗長度
該方法主要過程為:調reduceByKey()函式
val ssc=new StreamingContext(sc,Seconds(1))
val stream=xxxx //stream型別為[String,Long]
stream.print()
data.reduceByKey((c1,c2)=>{
c1+c2
}).print()
data.reduceByKeyAndWindow((c1,c2)=>{
c1+c2
},Seconds(10)).print()
第一次print():每秒列印一次接收的資料
第二次print():每秒列印一次,計算每個key的次數
第三次print():每秒列印一次最近10秒每個key的次數
updatestateByKey()
生成新的DStream
所有的window操作都是計算長度為視窗長度的資料,非window操作都是計算設定的時間間隔內的資料,而updateBykey可以理解成在無限長的時間裡,為每個key儲存一個狀態,這個時間長度可以通過ssc.awaitTerminationOrTimeout()來控制,一般來說長度每天或每小時。
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
當然,該方法過載方法也6個,這裡只討論上面的,傳入一個更新方法,該方法兩個引數:一個為當前時間間隔內資料,型別為Seq,一個為之前的資料,可能無資料(第一次提交計算的時候),型別為Option,返回值也為Option型別
下面是兩個例項,求pv和uv
//pv
val stream=xxxx//型別得轉化為[當前日期,1L]
stream.updateStateByKey((curvalues:Seq[Long],prevalue:Option[Long])=>{
val sum=curvalues.sum
val pre=prevalue.getOrElse(0L)
Some(sum+pre)
})
//uv 因為uv涉及到去重,故將userid放入Set裡
val stream=xxxx //型別為[當前日期,userid]
stream.updateStateByKey((curvalues:Seq[Set[String]],prevalue:Option[Set[String]])=>{
var curs=Set[String]()
if(!curvalues.isEmpty){
curs=curvalues.reduce(_++_)//將兩個Set集合合併
}
Some(curs++prevalue.getOrElse(Set[String]()))
})
未完待續。。。。