Spark Streaming介紹以及案例
阿新 • • 發佈:2019-01-09
概觀
Spark Streaming是核心Spark API的擴充套件,可實現實時資料流的可擴充套件,高吞吐量,容錯流處理。
資料來源:Kafka,Flume,Kinesis或TCP套接字等,
可以使用高階函式進行復雜演算法進行處理map
,例如reduce
,join
和window
。
處理後的資料可以推送到檔案系統,資料庫等
它的工作原理:
Spark Streaming接收實時輸入資料流並將資料分成批處理,然後由Spark引擎處理以批量生成最終結果流
Spark Streaming提供稱為離散流或DStream的高階抽象,表示連續的資料流。DStream可以從來自Kafka,Flume和Kinesis等源的輸入資料流建立,也可以通過在其他DStream上應用高階操作來建立。在內部,DStream表示為一系列 RDD。
案例介紹
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("MyTest") val ssc = new StreamingContext(conf,Seconds(1)) //建立一個DStream來表示來自TCP源的流資料,指定為主機名(例如localhost)和埠(例如9999)。 //此linesDStream表示將從資料伺服器接收的資料流。DStream中的每條記錄都是一行文字 val lines = ssc.socketTextStream("localhost",9999) //flatMap是一對多DStream操作,它通過從源DStream中的每個記錄生成多個新記錄來建立新的DStream。在這種情況下,每行將被分成多個單詞,單詞流表示為wordsDStream。 val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word,1)) val wordCount = pairs.reduceByKey(_+_) wordCount.print() //Spark Streaming僅設定啟動時將執行的計算,並且尚未啟動實際處理。要在設定完所有轉換後開始處理 ssc.start() ssc.awaitTermination() } }
使用埠傳送資料:
nc -lk 9999
檢視埠使用情況:
lsof -i:9999