第一個Spark Streaming案例程式
阿新 • • 發佈:2018-12-18
前面的文章大概的介紹了Spark Streaing流式處理框架,說的通俗點,實際上就是在Spark Core的基礎上進行了封裝,然後將小批次的資料進行處理,處理完了程序並不會停止,而是會一直存在,這樣只要有資料進來,就會進行處理,從而實現了流式處理
下面就來一個例項進行感受:
這裡選擇使用Linux下的nc工具來產生socket資料,然後Spark Streaming來進行處理
1.首先放出Spark Streaing的程式碼:
package cn.ysjh import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkContext} object SparkStreamingTest { def main(args: Array[String]): Unit = { //實時計算要建立StreamingContext,StreamingContext是對SparkContext的封裝, val conf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[4]") val context: SparkContext = new SparkContext(conf) //第二個引數是小批次產生的時間間隔,Milliseconds是毫秒 val streaming: StreamingContext = new StreamingContext(context,Milliseconds(5000)) //有了SparkContext就可以建立SparkStreaming的抽象DStream //從一個socket埠中讀取資料 val lines: ReceiverInputDStream[String] = streaming.socketTextStream("192.168.220.134",6789) //對DStream進行操作,你操作這個抽象(代理,描述),就像操作一個本地集合一樣 val words: DStream[String] = lines.flatMap(_.split(" ")) val wordOne: DStream[(String, Int)] = words.map((_,1)) val result: DStream[(String, Int)] = wordOne.reduceByKey(_+_) //列印結果 result.print() //啟動Spark Streaming程式 streaming.start() //等待優雅的退出 streaming.awaitTermination() } }
2.在一臺虛擬機器中安裝nc,在實際的系統中安裝nc的命令不同
Ubuntu:
apt-get install netcat
Centos:
yum install nc
3.在IDEA中執行Spark Streaming程式
4.在安裝了nc工具包的虛擬機器上輸入:
nc -lk 6789
然後隨便輸入資料,以空格隔開,就會在IDEA中看到執行的結果了,只要輸入一條資料就會計算統計出一次結果,而且是每隔5秒輸出一次
執行結果示例:
可以很明確的看出Spark Streaming就是一直在執行的Spark程式