Spark Streaming--2 自定義資料來源
阿新 • • 發佈:2019-01-05
通過繼承Receiver,並實現onStart、onStop方法來自定義資料來源採集。
需要自己開一個sockect,,然後輸入內容。
nc -lk master 8888
package com.jiangnan.spark import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.receiver.Receiver class TestSparkStreamCustomReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.DISK_ONLY){ //啟動的時候呼叫 override def onStart(): Unit = { println("啟動了") //建立一個socket val socket = new Socket(host,port) val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) //建立一個變數去讀取socket的輸入流的資料 var line = reader.readLine() while(!isStopped() && line != null){ //如果接收到了資料,就是用父類中的store方法進行儲存 store(line) //繼續讀取下一行資料 line = reader.readLine() } } ////終止的時候呼叫 override def onStop(): Unit = { println("停止了") } } object TestSparkStreamCustomReceiver extends App{ //配置物件 val conf = new SparkConf().setAppName("").setMaster("local[2]") //建立StreamContext val ssc = new StreamingContext(conf,Seconds(5)) //從socket接收資料 val lineDStream = ssc.receiverStream(new TestSparkStreamCustomReceiver("master",8888)) //統計詞頻 val res = lineDStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) res.print() //啟動 ssc.start() ssc.awaitTermination() }