1. 程式人生 > >spark streaming實時計數

spark streaming實時計數

● 在Linux終端視窗可以直接使用yum工具進行安裝:

[[email protected] ~]# yum install -y nc

● 傳送資料

[[email protected] ~]# nc -lk 8866

● 使用Streaming實時計數

package com.ws.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  *實時計數
  */
object StreamingWc {

  def main(args: Array[String]): Unit = {

    //離線任務是建立SparkContext;實現實時計算,用StreamingContext
    
    val conf = new SparkConf().setAppName("StreamingWc").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //StreamingContext是對SparkContext的包裝,包了一層就增加了實時的功能
    //第二個引數是小批次產生的時間間隔
    val ssc = new StreamingContext(sc, Milliseconds(5000))

    val data: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop-01", 8866)

    val flatData: DStream[String] = data.flatMap(_.split(" "))

    val arr: DStream[(String, Int)] = flatData.map((_, 1))

    val result = arr.reduceByKey(_ + _)

    result.print()

    //啟動sparksteaming程式
    ssc.start()
    //等待優雅的退出
    ssc.awaitTermination()
  }
}