1. 程式人生 > >spark streaming中WordCount

spark streaming中WordCount

通過一些簡單的案例,可以知道一些大致的用法

1.對每一個批次的資料進行操作:

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object sparkStreamingWC {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("sparkStreamingWC").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    //建立sparkstreaming入口的物件,也就是Streaming物件
    //資料批處理,要設定時間間隔,每5秒產生一次批次資料,名叫batch
    val ssc:StreamingContext=new StreamingContext(sc,Seconds(5))

    // 首先,建立輸入DStream,代表了一個從資料來源(比如kafka、socket)來的持續不斷的實時資料流
    // socketTextStream()方法接收兩個基本引數,第一個是監聽哪個主機上的ip,第二個是監聽哪個埠
    //從NetCat服務裡獲取資料。ReceiverInputDStream接收,裡面是String
    val dstream: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.88.130",8888)
    //按行讀取

    //返回的是DStream[(String, Int)],對批次進行處理,產生的是每一個批次的結果
    //呼叫DStream裡的api進行計算
    val res: DStream[(String, Int)] =dstream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    //不新增引數預設列印RDD中前10個元素
    res.print()

    //提交任務到叢集
    ssc.start()

    //執行緒等待下一批處理任務
    ssc.awaitTermination()
  }
}
2.updateStateByKey按批次累加
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

//實現批次資料的累加
object sparkStreamingWC2 {
  /*
* 實現按批次累加功能,需要呼叫updateStateByKey
* 其中需要自定義一個函式,該函式是對歷史結果資料和當前批次資料的操作過程
* 該函式中第一個引數代表每個單詞
* 第二個引數代表當前批次單詞出現的次數:Seq(1,1,1,1)
* 第三個引數代表之前批次累加的結果,可能有值,也可能沒有值,所以在獲取的時候要用getOrElse方法
   */
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("sparkStreamingWC2").setMaster("local[*]")

    val ssc = new StreamingContext(sparkConf,Milliseconds(10000))

    //設定檢查點,存已經處理的歷史資料,因為要累加,所以要存歷史資料
    ssc.checkpoint("D:\\資料\\spark")
    //獲取資料
    val dstram=ssc.socketTextStream("192.168.88.130",8888)
    //處理當前的資料得到一個集合
    val tup=dstram.flatMap(_.split(" ")).map((_,1))
    //將之前的資料累加過來  1.更新的函式2.分割槽器,將不同RDD的資料放到一個分割槽 3.是否記錄當前的分割槽器
    val res = tup.updateStateByKey(func,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
  //迭代器裡面是要處理的資料
  //自定義一個函式,第一個是處理的K值,第二個是當前K對於的V的序列集合,歷史記錄當前單詞出現的次數,第三個引數是之前累加的結果,也是當前對應的K
  val func =(it:Iterator[(String, Seq[Int], Option[Int])]) =>{
    it.map(x=>{
      (x._1,x._2.sum+x._3.getOrElse(0))
    })
  }
}

這裡是從NetCat伺服器裡面獲取資料,如果沒有可以使用下面的yum 的安裝

 

yum -y install nc 

開啟埠
nc -lk 8888
l:代表 netcat 將以監聽模式執行
k:代表示 nc 在接收完一個請求後不會立即退出,而是會繼續監聽其他請求
這時就可以請求該介面了, nc 會把請求報文輸出到標準輸出。