spark streaming實時計數
阿新 • • 發佈:2018-11-10
● 在Linux終端視窗可以直接使用yum工具進行安裝:
[[email protected] ~]# yum install -y nc
● 傳送資料
[[email protected] ~]# nc -lk 8866
● 使用Streaming實時計數
package com.ws.streaming import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** *實時計數 */ object StreamingWc { def main(args: Array[String]): Unit = { //離線任務是建立SparkContext;實現實時計算,用StreamingContext val conf = new SparkConf().setAppName("StreamingWc").setMaster("local[*]") val sc = new SparkContext(conf) //StreamingContext是對SparkContext的包裝,包了一層就增加了實時的功能 //第二個引數是小批次產生的時間間隔 val ssc = new StreamingContext(sc, Milliseconds(5000)) val data: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop-01", 8866) val flatData: DStream[String] = data.flatMap(_.split(" ")) val arr: DStream[(String, Int)] = flatData.map((_, 1)) val result = arr.reduceByKey(_ + _) result.print() //啟動sparksteaming程式 ssc.start() //等待優雅的退出 ssc.awaitTermination() } }