1. 程式人生 > >Spark Streaming示例(scala篇)

Spark Streaming示例(scala篇)

本段程式碼運行於Intellij IDEA中,與linux 中nc進行互動

1.Scala程式碼:


import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object StreamingWordCount {
  def main(args: Array[String]) {

    //程式在執行時receiver會獨佔一個執行緒,所以streaming程式至少要兩個執行緒,防止starvation scenario


    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCount")

//所有流功能的主要入口
    val ssc: StreamingContext = new StreamingContext(conf , Seconds(5))

//指定從TCP源資料流的離散流,接收到的每一行資料都是一行文字
    val stream: ReceiverInputDStream[String] =  ssc.socketTextStream("hadoop-1707-003",6666)

//將接收到的文字壓平,轉換,聚合

    val dStream: DStream[(String, Int)] =  stream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)

dStream.print()

// Spark Streaming 只有建立在啟動時才會執行計算,在它已經開始之後,並沒有真正地處理。
//---------------------------------------
//啟動計算
    ssc.start()
//等待計算終止
    ssc.awaitTermination()
    //true    會把內部的sparkcontext同時停止
    //false  只會停止streamingcontext  不會停sparkcontext

    ssc.stop(true)
  }

}

2.linux中nc

        2.1.下載nc

        2.2.解壓nc

        rpm  -ivh  nc-1.84-22.el6.x86_64.rpm

    2.3.開啟nc命令

        nc    -lk    6666

3.執行scala程式碼,並在nc上輸入

執行結果如圖: