Spark Streaming詳解----概述、基本概念、效能調優
1 概述
1.1 SparkStreaming是什麼
Spark Streaming 是個批處理的流式(實時)計算框架。其基本原理是把輸入資料以某一時間間隔批量的處理,當批處理間隔縮短到秒級時,便可以用於處理實時資料流。
支援從多種資料來源獲取資料,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis以及TCP sockets,從資料來源獲取資料之後,可以使用諸如map、reduce、join等高階函式進行復雜演算法的處理。最後還可以將處理結果儲存到檔案系統,資料庫等。
Spark Streaming處理的資料流圖:
以上的連續4個圖,分別對應以下4個段落的描述:
- Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各種來源的實時輸入資料,進行處理後,處理結果儲存在HDFS、Databases等各種地方。
- Spark Streaming接收這些實時輸入資料流,會將它們按批次劃分,然後交給Spark引擎處理,生成按照批次劃分的結果流。
- Spark Streaming提供了表示連續資料流的、高度抽象的被稱為離散流的DStream。DStream本質上表示RDD的序列。任何對DStream的操作都會轉變為對底層RDD的操作。
- Spark Streaming使用資料來源產生的資料流建立DStream,也可以在已有的DStream上使用一些操作來建立新的DStream。
1.2 2. Spark Streaming能做什麼
目前而言SparkStreaming 主要支援以下三種業務場景
- 無狀態操作:只關注當前批次中的實時資料,例如:
- 商機標題分類,分類http請求端 -> kafka -> Spark Streaming -> http請求端Map -> 響應結果
- 網庫Nginx訪問日誌收集,flume->kafka -> Spark Streaming -> hive/hdfs
- 資料同步,網庫主站資料通過“主站”->kafka->Spark Streaming -> hive/hdfs
- 有狀態操作:對有狀態的DStream進行操作時,需要依賴之前的資料 除了當前新生成的小批次資料,但還需要用到以前所生成的所有的歷史資料。新生成的資料與歷史資料合併成一份流水錶的全量資料例如:
- 實時統計網庫各個站點總的訪問量
- 實時統計網庫每個商品的總瀏覽量,交易量,交易額。
- 視窗操作:定時對指定時間段範圍內的DStream資料進行操作,例如:
- 網庫主站的惡意訪問、爬蟲,每10分鐘統計30分鐘內訪問次數最多的使用者。
1.3 特性
1.3.1 優點:
- 吞吐量大、速度快。
- 容錯:SparkStreaming在沒有額外程式碼和配置的情況下可以恢復丟失的工作。checkpoint。
- 社群活躍度高。生態圈強大。
- 資料來源廣泛。
1.3.2 缺點:
- 延遲。500毫秒已經被廣泛認為是最小批次大小,這個相對storm來說,還是大很多。所以實際場景中應注意該問題,就像標題分類場景,設定的0.5s一批次,加上處理時間,分類介面會佔用1s的響應時間。實時要求高的可選擇使用其他框架。
2 基礎概念-開發
2.1 簡單示例
2.1.1 Word count詞頻計算demo
- object NetworkWordCount {
- def main(args: Array[String]) {
- val sparkConf = new SparkConf()
- val ssc = new StreamingContext(sparkConf, Seconds(1))
- val lines = ssc.socketTextStream(hostname, port)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
2.1.2 說明
1. 通過建立輸入DStreams來定義輸入源。
2. 通過將轉換和輸出操作應用於DStream來定義流式計算。
3. 開始接收資料並使用它進行處理streamingContext.start()
。
4. 等待處理停止(手動或由於任何錯誤)使用streamingContext.awaitTermination()
。
5. 可以手動停止處理streamingContext.stop()
。
2.1.3 注意
1. 一旦上下文開始,就不能設定或新增新的流計算。
2. 一旦上下文停止,它將無法重新啟動。
3. 只有一個StreamingContext可以在JVM中同時處於活動狀態。
2.2 輸入源
Spark Streaming提供了兩類輸入源。
· 基本來源:StreamingContextAPI中直接提供的資源。示例:檔案系統,套接字連線。
1.檔案系統:streamingContext.fileStream(hdfsDataDirectory)
SparkStreaming將監聽目錄dataDirectory
並處理在該目錄中建立的任何檔案(不支援巢狀目錄中寫入的檔案)
檔案必須具有相同的資料格式。
必須dataDirectory通過將資料原子移動或重新命名為資料目錄來建立檔案。
移動後,檔案不能更改。因為,如果檔案被不斷附加,則不會讀取新的資料。
Sparkstreaming
監聽對應主機-埠,處理髮送到該埠的資料。
· 高階來源:Kafka,Flume等資源可以通過額外的實用類來獲得。
實際應用場景中,Kafak使用較多,主要介紹Kafka的使用:
KafkaUtils.createStream(ssc, zkQuorum, groupId, topicsMap)
ssc:streamingContext
zkQuorum:kafka元資料在zookeeper中的儲存地址(示例:node1:2181/kafka)
groupId:spark streaming接受kafka資料使用的使用者組id,可通過該引數控制每次接受kafka資料的索引位置,spark streaming每次啟動都會從該groupId上次接收到的資料位置開始接收。
topicsMap:Map[String, Int]型別物件,key對應接收的資料 topic名稱,value為執行緒數量。sparkstreaming接收kafka資料的啟動的執行緒數量,即併發量
如果要在流式應用程式中並行接收多個數據流,則可以建立多個輸入DStream
2.3 DStream轉換操作
操作 | 含義 |
map(func) | 通過傳遞源DStream的每個元素通過函式func返回一個新的DStream |
flatMap(func) | 與map類似,但每個輸入項可以對映到0個或更多的輸出項。 |
filter(func) | 通過僅選擇func返回true的源DStream的記錄來返回新的DStream |
repartition(numPartitions) | 通過建立更多或更少的分割槽來更改此DStream中的並行級別。 |
union(otherStream) | 返回一個新的DStream,它包含源DStream和otherDStream中元素的並集。 |
count() | 通過計算源DStream的每個RDD中的元素數量來返回單元素RDD的新DStream |
reduce(func) | 通過使用函式func(它需要兩個引數並返回一個),通過聚合源DStream的每個RDD中的元素來返回單元素RDD的新DStream。該函式應該是關聯的,以便可以平行計算。 |
countByValue() | 當呼叫型別為K的元素的DStream時,返回一個新的DStream(K,Long)對,其中每個鍵的值是源DStream的每個RDD中的頻率。 |
reduceByKey(func,[numTasks]) | 當(K,V)對的DStream被呼叫時,返回(K,V)對的新DStream,其中使用給定的reduce函式聚合每個鍵的值。注意:預設情況下,使用Spark的預設並行任務數(2為本地模式,群集模式中的數字由config屬性決定spark.default.parallelism )進行分組。您可以傳遞可選numTasks 引數來設定不同數量的任務。 |
join(otherStream,[numTasks]) | 當(K,V)和(K,W)對的兩個DStream被呼叫時,返回一個新的(K,(V,W))對的DStream與每個鍵的所有元素對。 |
cogroup(otherStream,[numTasks]) | 當呼叫(K,V)和(K,W)對的DStream時,返回一個新的DStream(K,Seq [V],Seq [W])元組。 |
transform(func) | 通過對源DStream的每個RDD應用RDD到RDD函式來返回一個新的DStream。這可以用於對DStream進行任意RDD操作。 |
updateStateByKey(func) | 返回一個新的“狀態”DStream,其中通過對鍵的先前狀態應用給定的功能和鍵的新值來更新每個鍵的狀態。這可以用於維護每個金鑰的任意狀態資料。 |
Transform操作:
- val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...)
- val cleanedDStream = wordCounts.transform(rdd => {
- rdd.join(spamInfoRDD).filter(...) ...
- })
UpdateStateByKey 操作:
- val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
- //產生我們需要的pair rdd
- val linerdd = lines.map{row =>{
- ···
- (key, amt)
- }}
- val addFunc = (currValues: Seq[Int], preValueState: Option[Int]) =>{
- //通過spark內部的reducebykey按key規約,然後這裡傳入某key當前批次的seq,再計算key的總和
- val currentCount = currValues.sum
- //已經累加的值
- val previousCount = preValueState.getOrElse(0)
- //返回累加後的結果,是一個Option[Int]型別
- Some(currentCount + previousCount)
- }
- linerdd.updateStateByKey[Int](addFunc _).print()
Windows操作
下圖說明了這個視窗。
如圖:
1. 紅色的矩形就是一個視窗,視窗hold的是一段時間內的資料流。
2.這裡面每一個time都是時間單元,在官方的例子中,每隔window size是3 time unit, 而且每隔2個單位時間,視窗會slide一次。
所以基於視窗的操作,需要指定2個引數:
· window length - The duration of the window (3 inthe figure)
· slide interval - The interval at which the window-basedoperation is performed (2 in the figure).
舉個例子吧:
還是以wordcount舉例,每隔10秒,統計一下過去30秒過來的資料。
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
這裡的paris就是一個DStream,每條資料類似(word,1)
一些常見的視窗操作如下。所有這些操作都採用上述兩個引數 - windowLength和slideInterval。
操作 | 含義 |
window(windowLength,slideInterval) | 返回基於源DStream的視窗批次計算的新DStream。 |
countByWindow(windowLength,slideInterval) | 返回流中元素的滑動視窗數。 |
reduceByWindow(func,windowLength,slideInterval) | 返回一個新的單元素流,通過使用func在滑動間隔中通過在流中聚合元素建立。 |
reduceByKeyAndWindow(func,windowLength,slideInterval,[ numTasks ]) | 當對(K,V)對的DStream進行呼叫時,返回(K,V)對的新DStream,其中每個鍵的值 在滑動視窗中使用給定的減少函式func進行聚合。 |
countByValueAndWindow(windowLength, slideInterval,[numTasks ]) | 當呼叫(K,V)對的DStream時,返回(K,Long)對的新DStream,其中每個鍵的值是其滑動視窗內的頻率。 |
2.4 DStream的輸出操作
輸出操作允許將DStream的資料推送到外部系統,如資料庫或檔案系統。由於輸出操作實際上允許外部系統使用變換後的資料,所以它們觸發所有DStream變換的實際執行(類似於RDD的動作)。目前,定義了以下輸出操作:
操作 | 含義 |
print() | 在執行流應用程式的驅動程式節點上的DStream中列印每批資料的前十個元素。 |
saveAsTextFiles(prefix,[ suffix ]) | 將此DStream的內容另存為文字檔案。基於產生在每批間隔的檔名的字首和字尾:“字首TIME_IN_MS [.suffix]”。 |
saveAsObjectFiles(prefix,[ suffix ]) | 將DStream的內容儲存為 |
saveAsHadoopFiles(prefix,[ suffix]) | 將此DStream的內容另存為Hadoop檔案。基於產生在每批間隔的檔名的字首和字尾:“字首TIME_IN_MS [.suffix]”。 |
foreachRDD(func) | 對從流中生成的每個RDD 應用函式func的最通用的輸出運算子。此功能應將每個RDD中的資料推送到外部系統,例如將RDD儲存到檔案,或將其通過網路寫入資料庫。請注意,函式func在執行流應用程式的驅動程式程序中執行,通常會在其中具有RDD動作,從而強制流式傳輸RDD的計算。 |
注意:
- DStreams通過輸出操作進行延遲執行,就像RDD由RDD操作懶惰地執行。具體來說,DStream輸出操作中的RDD動作強制處理接收到的資料。因此,如果您的應用程式沒有任何輸出操作,或者具有輸出操作,比如dstream.foreachRDD()沒有任何RDD操作,那麼任何操作都不會被執行。系統將簡單地接收資料並將其丟棄。
- 預設情況下,輸出操作是一次一個執行的。它們按照它們在應用程式中定義的順序執行。
2.5 DataFrame和SQL操作
可以輕鬆地在流資料上使用DataFrames和SQL操作。您必須使用StreamingContext正在使用的SparkContext建立一個SparkSession。此外,必須這樣做,以便可以在驅動程式故障時重新啟動。
這在下面的示例中顯示。將每個RDD轉換為DataFrame,註冊為臨時表,然後使用SQL進行查詢。
- val words: DStream[String] = ...
- words.foreachRDD { rdd =>
- // Get the singleton instance of SparkSession
- val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
- import spark.implicits._
- // Convert RDD[String] to DataFrame
- val wordsDataFrame = rdd.toDF("word")
- // Create a temporary view
- wordsDataFrame.createOrReplaceTempView("words")
- // Do word count on DataFrame using SQL and print it
- val wordCountsDataFrame =
- spark.sql("select word, count(*) as total from words group by word")
- wordCountsDataFrame.show()
- }
2.6 MLlib操作
可以通過訓練出個模型,然後將模型作為廣播變數,在DStream操作中使用該模型預測相關資料。
2.7 快取/永續性
與RDD類似,DStreams還允許開發人員將流的資料保留在記憶體中。也就是說,使用persist()
DStream上的方法將自動將該DStream的每個RDD保留在記憶體中。如果DStream中的資料將被多次計算(例如,相同資料上的多個操作),這是非常有用的。對於基於視窗的操作,像reduceByWindow
和reduceByKeyAndWindow
和基於狀態的操作一樣updateStateByKey
,這是隱含的。因此,基於視窗的操作生成的DStreams將自動保留在記憶體中,無需開發人員的呼叫persist()
。
2.8 CheckPoint/檢查點
流式應用程式必須全天候執行,因此必須能夠適應與應用程式邏輯無關的故障(例如,系統故障,JVM崩潰等)。為了可以這樣做,Spark Streaming