1. 程式人生 > >DStream 轉換操作------有狀態轉換操作

DStream 轉換操作------有狀態轉換操作

wordcount map rds set def 一次 .so pac 如果

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object DStream_轉換操作 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("轉換操作").setMaster("local[2]")
    val sc=new StreamingContext(conf,Seconds(4))
    val lines=sc.socketTextStream("
localhost",8899) sc.checkpoint("file:///usr/local2/spark/mycode/kafa3/checkpoint") val words=lines.flatMap(x=>x.split(" ")) val wordsStream=words.map(x=>(x,1)) 3.val stateStream=wordsStream.updateStateByKey[Int](update) sc.checkpoint("file:///usr/local2/spark/mycode/kafa2/checkpoint") 1.
//val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,_-_,Seconds(16),Seconds(4),2)//DStream有狀態轉換操作 2. val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(16),Seconds(4),2) wordCount.print(100) stateStream.print() sc.start() sc.awaitTermination() } val update
=(values:Seq[Int],state:Option[Int])=>{ val currentCount=values.foldLeft(0)(_+_) val previousCount= state.getOrElse(0) Some(currentCount+previousCount) } }

註意:

reduceByKeyAndWindow中的Seconds(16)是滑動窗口長度,Seconds(4)是滑動窗口時間間隔(每隔多長時間滑動一次窗口)這兩個值必須是 new StreamingContext(conf,Seconds(4)) 中Seconds(4)的倍數(>=1)
如果第二個4<滑動窗口時間間隔 程序結果的時間線就變成了以滑動窗口時間間隔為準
1,2,3區別:
1.會保留歷史對象的名字列表
2.不會保留
3.在歷史值的基礎上累加,但(1,2)會隨著窗口滑動,所有對象的值會變為0
4.(1和2適合統計實時時間段內詞頻)

DStream 轉換操作------有狀態轉換操作