Spark Streaming示例(scala篇)
阿新 • • 發佈:2019-02-17
本段程式碼運行於Intellij IDEA中,與linux 中nc進行互動
1.Scala程式碼:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]) {
//程式在執行時receiver會獨佔一個執行緒,所以streaming程式至少要兩個執行緒,防止starvation scenario
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCount")
//所有流功能的主要入口
val ssc: StreamingContext = new StreamingContext(conf , Seconds(5))
//指定從TCP源資料流的離散流,接收到的每一行資料都是一行文字
val stream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop-1707-003",6666)
//將接收到的文字壓平,轉換,聚合
val dStream: DStream[(String, Int)] = stream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
dStream.print()
// Spark Streaming 只有建立在啟動時才會執行計算,在它已經開始之後,並沒有真正地處理。
//---------------------------------------
//啟動計算
ssc.start()
//等待計算終止
ssc.awaitTermination()
//true 會把內部的sparkcontext同時停止
//false 只會停止streamingcontext 不會停sparkcontext
ssc.stop(true)
}
}
2.linux中nc
2.1.下載nc
2.2.解壓nc
rpm -ivh nc-1.84-22.el6.x86_64.rpm
2.3.開啟nc命令
nc -lk 6666
3.執行scala程式碼,並在nc上輸入
執行結果如圖: