1. 程式人生 > 其它 >Spark Streaming應用

Spark Streaming應用

技術標籤:Spark大資料sparkhdfshadoop

Spark Streaming應用

實驗目的

深入理解和掌握Spark Stream中DStream無狀態操作的方法;理解Spark Stream程式設計解決實際問題的方法。

實驗要求

  1. 掌握基於Spark Stream的Scala和Spark SQL程式設計環境配置;
  2. 掌握Spark Stream中DStream無狀態操作程式設計方法。

實驗內容

  1. 參考實驗二建立一個Spark專案
  2. 在Maven中配置Spark Streaming程式設計環境,pom.xml中新增:
<dependency>
	<groupId>
org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.0.1</version> </dependency>

若自己安裝的Spark不是3.0.1,則自己搜尋適合的spark-streaming版本

  1. 將MyReceiver.scala新增到專案中
    MyReceiver生成的流資料格式:每次生成num個元素(視構造方法傳入的引數而定),每個元素均是為字串,包含了3個100以內的隨機數,每個隨機數用空格隔開,例:
10 42 80
78 1 7
8 14 8
33 21 2
52 5 5
  1. 編寫MyStreaming.scala中的main方法以實現流處理

(1) 建立SparkContext和StreamingContext,設定時間間隔為3秒(參見PPT第6章3.3)

val conf = new SparkConf()
conf.setAppName("名字")
val sc = new SparkContext(conf)
val ssc = new SteamingContext(sc, Seconds(3))

(2) 使用receiverStream方法定義輸入源,採用自定義MyReceiver輸入源:

//lines為輸入源

val lines = ssc.receiverStream(new MyReceiver(5))

(3) 使用DStream的無狀態轉換運算元,計算輸入流lines中的資料;計算每個batch的所有數字的平均值
//對輸入流lines應用DStream轉換運算元提示,ds為轉換後的DStream

val ds = lines.map(x=>x.split(regx=" ").map(x=>x.toInt)).map(x=>x.sum/5)

(4) 使用foreachRDD儲存輸入流資料:
//遍歷DStream的所有RDD

lines.foreachRDD(x => {
if(x.count() > 0) {
//使用RDD的saveAsTextFile儲存RDD資料,儲存目錄以時間戳命名
x.saveAsTextFile("file:///root/rdds/rec" + new Date().getTime.toString)
}
})

(5) 使用foreachRDD儲存計算後的流資料:參考第(4)步程式碼,儲存目錄設定為/root/result/rec……(省略號部分用時間戳代替))

(6) 設定執行過程中列印資訊scc.print;使用scc.start啟動spark streaming,並設定結束條件scc.awaitTermination

  1. 編譯和執行:
    按實驗二中的方法編譯打包並執行程式,Spark Streaming需要手動停止,執行一段時間後可按下ctrl+z強行停止
  2. 檢視結果:
    開啟spark-shell,使用sc.textFile(“file:///root/rdds/") 讀取所有儲存的DSteam輸入源資料並顯示;使用sc.textFile("file:///root/result/”) 讀取所有儲存的計算結果資料並顯示。(注意:路徑後加*,表示提取所有子目錄中的檔案)
    在這裡插入圖片描述
    在這裡插入圖片描述

儲存結果預覽
在這裡插入圖片描述
在這裡插入圖片描述

程式碼

package cn.edu.swpu.scs
import java.util.Date
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Streaming{
//當Receiver啟動時呼叫onStart方法
//一次啟動寫入到DStream的一個RDD的分割槽中
def main(args: Array[String]): Unit = {
// 建立SparkContext和StreamingContext,設定時間間隔為3秒  
val conf = new SparkConf()  
conf.setAppName("My Streaming")  
val sc = new SparkContext(conf)  
val ssc = new StreamingContext(sc, Seconds(3)) 
//使用receiverStream方法定義輸入源  
val lines = ssc.receiverStream(new MyReceiver(5))  
// 計算每個batch的所有數字的平均值  
val ds = lines.map(x=>x.split(" ").map(x=>x.toInt)).map(x=>x.sum/5)  
// 使用foreachRDD儲存輸入流資料  
lines.foreachRDD(x => {  
  if(x.count() > 0) {  
    // 使用RDD的saveAsTextFile儲存RDD資料,儲存目錄以時間戳命名  
    x.saveAsTextFile("hdfs://主機名或ip地址:埠號/檔案路徑" + new Date().getTime.toString)  
  }  
})  
// 使用foreachRDD儲存計算後的資料  
ds.foreachRDD(x => {  
  if(x.count() > 0) {  
    // 使用RDD的saveAsTextFile儲存RDD資料,儲存目錄以時間戳命名  
    x.saveAsTextFile("hdfs://主機名或ip地址:埠號/檔案路徑" + new Date().getTime.toString)  
  }  
})  
ds.print()  
ssc.start()  
ssc.awaitTermination()  
}
}