Spark Streaming基礎總結
最近在看Spark的相關東西,主要包括Spark的基礎知識、Spark Streaming等,寫了一些總結。
1 Introduction
從時間維度可以將資料分析可以分為歷史資料的分析和實時資料的分析,例如Hive可以實現對於歷史全量資料的計算,但是花費時間往往較長。實際場景中,如“雙11”各大電商平臺實時計算當前訂單的情況時,需要實時對各個訂單的資料依次進行採集、分析處理、儲存等步驟,對於資料處理的速度要求很高,而且需要保持一段時期內不間斷的執行。對於這類問題,Spark通過Spark Streaming元件提供了支援,Spark Streaming可以實現對於高吞吐量、實時的資料流的處理和分析,支援多樣化的資料來源如Kafka、Flume、HDFS、Kinesis和Twitter等。
2 DStream Model
類似於RDD之於Spark,Spark Streaming也有自己的核心資料物件,稱為DStream(Discretized Stream,離散流)。使用Spark Streaming需要基於一些資料來源建立DStream,然後在DStream上進行一些資料操作,這裡的DStream可以近似地看作是一個比RDD更高一級的資料結構或者是RDD的序列。
雖然Spark Streaming聲稱是進行流資料處理的大資料系統,但從DStream的名稱中就可以看出,它本質上仍然是基於批處理的。DStream將資料流進行分片,轉化為多個batch,然後使用Spark Engine對這些batch進行處理和分析。
這裡的batch是基於時間間隔來進行分割的,這裡的__批處理時間間隔__(batch interval)需要人為確定,每一個batch的資料對應一個RDD例項。
2.1 Input DStream
Input DStream是一種特殊的DStream,它從外部資料來源中獲取原始資料流,連線到Spark Streaming中。Input DStream可以接受兩種型別的資料輸入:
(1)基本輸入源:檔案流、套接字連線和Akka Actor。
(2)高階輸入源:Kafka、Flume、Kineis、Twitter等。
基本輸入源可以直接應用於StreamingContext API,而對於高階輸入源則需要一些額外的依賴包。
由於Input DStream要接受外部的資料輸入,因此每個Input DStream(不包括檔案流)都會對應一個單一的接收器(Receiver)物件,每個接收器物件都對應接受一個數據流輸入。由於接收器需要持續執行,因此會佔用分配給Spark Streaming的一個核,如果可用的核數不大於接收器的數量,會導致無法對資料進行其他變換操作。
2.2 DStream Transformation
DStream的轉換也和RDD的轉換類似,即對於資料物件進行變換、計算等操作,但DStream的Transformation中還有一些特殊的操作,如updateStateByKey()、transform()以及各種Window相關的操作。下面列舉了一些主要的DStream操作:
Transformation | Interpretation |
---|---|
map(func) | 對DStream中的各個元素進行func函式操作,然後返回一個新的DStream. |
flatMap(func) | 與map方法類似,只不過各個輸入項可以被輸出為零個或多個輸出項. |
filter(func) | 過濾出所有函式func返回值為true的DStream元素並返回一個新的DStream. |
repartition(numPartitions) | 增加或減少DStream中的分割槽數,從而改變DStream的並行度. |
union(otherStream) | 將源DStream和輸入引數為otherDStream的元素合併,並返回一個新的DStream. |
count() | 通過對DStream中的各個RDD中的元素進行計數. |
reduce(func) | 對源DStream中的各個RDD中的元素利用func進行聚合操作,然後返回只有一個元素的RDD構成的新的DStream. |
countByValue() | 對於元素型別為K的DStream,返回一個元素為(K,Long)鍵值對形式的新的DStream,Long對應的值為源DStream中各個RDD的key出現的次數. |
reduceByKey(func, [numTasks]) | 利用func函式對源DStream中的key進行聚合操作,然後返回新的(K,V)對構成的DStream. |
join(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)型別的DStream,返回一個新的(K,(V,W))型別的DStream. |
cogroup(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)型別的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組型別的DStream. |
transform(func) | 通過RDD-to-RDD函式作用於DStream中的各個RDD,返回一個新的RDD. |
updateStateByKey(func) | 根據於key的前置狀態和key的新值,對key進行更新,返回一個新狀態的DStream. |
Spark Streaming提供了基於視窗(Window)的計算,即可以通過一個滑動視窗,對原始DStream的資料進行轉換,得到一個新的DStream。這裡涉及到兩個引數的設定:
(1)視窗長度(window length):一個視窗覆蓋的流資料的時間長度,必須是批處理時間間隔的倍數。視窗長度決定了一個視窗內包含多少個batch的資料。
(2)視窗滑動時間間隔(slide interval):前一個視窗到後一個視窗所經過的時間長度,必須是批處理時間間隔的倍數。
2.3 DStream Output Operations
DStream的輸出操作(Output Operations)可以將DStream的資料輸出到外部的資料庫或檔案系統。與RDD的Action類似,當某個Output Operation被呼叫時,Spark Streaming程式才會開始真正的計算過程。
下面列舉了一些具體的輸出操作:
Output Operations | Interpretation |
---|---|
print() | 列印到控制檯. |
saveAsTextFiles(prefix, [suffix]) | 儲存DStream的內容為文字檔案,檔名為”prefix-TIME_IN_MS[.suffix]”. |
saveAsObjectFiles(prefix, [suffix]) | 儲存DStream的內容為SequenceFile,檔名為 “prefix-TIME_IN_MS[.suffix]”. |
saveAsHadoopFiles(prefix, [suffix]) | 儲存DStream的內容為Hadoop檔案,檔名為”prefix-TIME_IN_MS[.suffix]”. |
foreachRDD(func) | 對Dstream裡面的每個RDD執行func,並將結果儲存到外部系統,如儲存到RDD檔案中或寫入資料庫. |
3 Fault Tolerance
容錯(Fault Tolerance)指的是一個系統在部分模組出現故障時仍舊能夠提供服務的能力。一個分散式資料處理程式要能夠長期不間斷執行,這就要求計算模型具有很高的容錯性。
Spark操作的資料一般儲存與類似HDFS這樣的檔案系統上,這些檔案系統本身就有容錯能力。但是由於Spark Streaming處理的很多資料是通過網路接收的,即接收到資料的時候沒有備份,為了讓Spark Streaming程式中的RDD都能夠具有和普通RDD一樣的容錯性,這些資料需要被複制到多個Worker節點的Executor記憶體中。
Spark Streaming通過檢查點(Check Point)的方式來平衡容錯能力和代價問題。DStream依賴的RDD是可重複的資料集,每一個RDD從建立之初都記錄了每一步的計算過程,如果RDD某個分割槽由於一些原因資料丟失了,就可以重新執行計算來恢復資料。隨著執行時間的增加,資料恢復的代價也會隨著計算過程而增加,因此Spark提供了檢查點功能,定期記錄計算的中間狀態,避免資料恢復時的漫長計算過程。Spark Streaming支援兩種型別的檢查點:
(1)元資料檢查點。這種檢查點可以記錄Driver的配置、操作以及未完成的批次,可以讓Driver節點在失效重啟後可以繼續執行。
(2)資料檢查點。這種檢查點主要用來恢復有狀態的轉換操作,例如updateStateByKey或者reduceByKeyAndWindow操作,它可以記錄資料計算的中間狀態,避免在資料恢復時,長依賴鏈帶來的恢復時間的無限增長。
開啟檢查點功能需要設定一個可靠的檔案系統路徑來儲存檢查點資訊,程式碼如下:
streamingContext.checkpoing(checkPointDirectory) //引數是檔案路徑
為了讓Driver自動重啟時能夠開啟檢查點功能,需要對原始StreamingContext進行簡單的修改,建立一個檢查檢查點目錄的函式,程式碼如下:
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)
ssc.checkpoint(checkpointDirecroty) //設定檢查點目錄
...
val lines = ssc.socketTextStream(...) //建立DStream
}
//從檢查點目錄恢復一個StreamingContext或者建立一個新的
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext())
//啟動context
context.start()
context.awaitTermination()
參考文獻
[1] 於俊. Spark核心技術與高階應用[M]. 機械工業出版社, 2016.
[2] 陳歡林世飛. Spark最佳實踐[M].人民郵電出版社, 2016.