spark streaming中WordCount
阿新 • • 發佈:2018-11-11
通過一些簡單的案例,可以知道一些大致的用法
1.對每一個批次的資料進行操作:
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object sparkStreamingWC { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("sparkStreamingWC").setMaster("local[*]") val sc = new SparkContext(sparkConf) //建立sparkstreaming入口的物件,也就是Streaming物件 //資料批處理,要設定時間間隔,每5秒產生一次批次資料,名叫batch val ssc:StreamingContext=new StreamingContext(sc,Seconds(5)) // 首先,建立輸入DStream,代表了一個從資料來源(比如kafka、socket)來的持續不斷的實時資料流 // socketTextStream()方法接收兩個基本引數,第一個是監聽哪個主機上的ip,第二個是監聽哪個埠 //從NetCat服務裡獲取資料。ReceiverInputDStream接收,裡面是String val dstream: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.88.130",8888) //按行讀取 //返回的是DStream[(String, Int)],對批次進行處理,產生的是每一個批次的結果 //呼叫DStream裡的api進行計算 val res: DStream[(String, Int)] =dstream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //不新增引數預設列印RDD中前10個元素 res.print() //提交任務到叢集 ssc.start() //執行緒等待下一批處理任務 ssc.awaitTermination() } }
2.updateStateByKey按批次累加
import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} //實現批次資料的累加 object sparkStreamingWC2 { /* * 實現按批次累加功能,需要呼叫updateStateByKey * 其中需要自定義一個函式,該函式是對歷史結果資料和當前批次資料的操作過程 * 該函式中第一個引數代表每個單詞 * 第二個引數代表當前批次單詞出現的次數:Seq(1,1,1,1) * 第三個引數代表之前批次累加的結果,可能有值,也可能沒有值,所以在獲取的時候要用getOrElse方法 */ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("sparkStreamingWC2").setMaster("local[*]") val ssc = new StreamingContext(sparkConf,Milliseconds(10000)) //設定檢查點,存已經處理的歷史資料,因為要累加,所以要存歷史資料 ssc.checkpoint("D:\\資料\\spark") //獲取資料 val dstram=ssc.socketTextStream("192.168.88.130",8888) //處理當前的資料得到一個集合 val tup=dstram.flatMap(_.split(" ")).map((_,1)) //將之前的資料累加過來 1.更新的函式2.分割槽器,將不同RDD的資料放到一個分割槽 3.是否記錄當前的分割槽器 val res = tup.updateStateByKey(func,new HashPartitioner(ssc.sparkContext.defaultParallelism),true) res.print() ssc.start() ssc.awaitTermination() } //迭代器裡面是要處理的資料 //自定義一個函式,第一個是處理的K值,第二個是當前K對於的V的序列集合,歷史記錄當前單詞出現的次數,第三個引數是之前累加的結果,也是當前對應的K val func =(it:Iterator[(String, Seq[Int], Option[Int])]) =>{ it.map(x=>{ (x._1,x._2.sum+x._3.getOrElse(0)) }) } }
這裡是從NetCat伺服器裡面獲取資料,如果沒有可以使用下面的yum 的安裝
yum -y install nc
開啟埠
nc -lk 8888
l:代表 netcat 將以監聽模式執行
k:代表示 nc 在接收完一個請求後不會立即退出,而是會繼續監聽其他請求
這時就可以請求該介面了, nc 會把請求報文輸出到標準輸出。