1. 程式人生 > >Spark Streaming(03)——Dstream及相關操作

Spark Streaming(03)——Dstream及相關操作

1、什麼是Dstream?

Dstream(Discretized Stream)是Spark Streaming的基礎抽象,代表持續性的資料流和經過各種Spark運算元操作後的結果資料流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的資料,如下圖: 在這裡插入圖片描述 應用於DStream的任何操作都轉換為底層RDD上的操作。例如,在先前將行流轉換為字的示例中,flatMap操作應用於linesDStream中的每個RDD 以生成DStream的 wordsRDD。如下圖所示。 在這裡插入圖片描述

Spark Streaming使用資料來源產生的資料流建立DStream,也可以在已有的DStream上使用一些操作來建立新的DStream。 它的工作流程像下面的圖所示一樣,接受到實時資料後,給資料分批次,然後傳給Spark Engine處理最後生成該批次的結果。 在這裡插入圖片描述

2、DStream相關操作

DStream上的操作與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種.

2.1 Transformations on DStreams

與RDD類似,轉換允許修改來自輸入DStream的資料。DStreams支援普通Spark RDD上可用的許多轉換。

Transformation Meaning
map(func) 對DStream中的各個元素進行func函式操作,然後返回一個新的DStream
flatMap(func) 與map方法類似,只不過各個輸入項可以被輸出為零個或多個輸出項
filter(func) 過濾出所有函式func返回值為true的DStream元素並返回一個新的DStream
repartition(numPartitions) 增加或減少DStream中的分割槽數,從而改變DStream的並行度
union(otherStream) 將源DStream和輸入引數為otherDStream的元素合併,並返回一個新的DStream.
count() 通過對DStream中的各個RDD中的元素進行計數,然後返回只有一個元素的RDD構成的DStream
reduce(func) 對源DStream中的各個RDD中的元素利用func進行聚合操作,然後返回只有一個元素的RDD構成的新的DStream.
countByValue() 對於元素型別為K的DStream,返回一個元素為(K,Long)鍵值對形式的新的DStream,Long對應的值為源DStream中各個RDD的key出現的次數
reduceByKey(func, [numTasks]) 利用func函式對源DStream中的key進行聚合操作,然後返回新的(K,V)對構成的DStream
join(otherStream, [numTasks]) 輸入為(K,V)、(K,W)型別的DStream,返回一個新的(K,(V,W))型別的DStream
cogroup(otherStream, [numTasks]) 輸入為(K,V)、(K,W)型別的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組型別的DStream
transform(func) 通過RDD-to-RDD函式作用於DStream中的各個RDD,可以是任意的RDD操作,從而返回一個新的RDD
updateStateByKey(func) 根據key的之前狀態值和key的新值,對key進行更新,返回一個新狀態的DStream

特殊的Transformations (1)UpdateStateByKey Operation UpdateStateByKey用於記錄歷史記錄,儲存上次的狀態 該updateStateByKey操作允許您在使用新資訊持續更新時,保持任意狀態。要使用它,您必須執行兩個步驟。 定義狀態 - 狀態可以是任意資料型別。 定義狀態更新功能 - 使用函式指定如何使用先前狀態和輸入流中的新值更新狀態。 scala程式碼: def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = … // add the new values with the previous running count to get the new count Some(newCount) }

(2)Window Operations(開窗函式) 滑動視窗轉換操作: 滑動視窗轉換操作的計算過程如下圖所示,我們可以事先設定一個滑動視窗的長度(也就是視窗的持續時間),並且設定滑動視窗的時間間隔(每隔多長時間執行一次計算),然後,就可以讓視窗按照指定時間間隔在源DStream上滑動,每次視窗停放的位置上,都會有一部分DStream被框入視窗內,形成一個小段的DStream,這時,就可以啟動對這個小段DStream的計算。 在這裡插入圖片描述 如上圖所示: 紅色的矩形就是一個視窗,視窗框住的是一段時間內的資料流。 這裡面每一個time都是時間單元,在官方的例子中,每隔window size是3 time unit, 而且每隔2個單位時間,視窗會slide一次。 所以基於視窗的操作,需要指定2個引數: • window length - The duration of the window (3 in the figure) • slide interval - The interval at which the window-based operation is performed (2 in the figure). a.視窗大小,一段時間內資料的容器。 b.滑動間隔,每隔多久計算一次。

Output Operations on DStreams

Output Operations可以將DStream的資料輸出到外部的資料庫或檔案系統,當某個Output Operations被呼叫時,spark streaming程式才會開始真正的計算過程。 類似於RDD的Action.

Output Operation Meaning
print() 列印到控制檯
saveAsTextFiles(prefix, [suffix]) 儲存流的內容為文字檔案,檔名為"prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) 儲存流的內容為SequenceFile,檔名為 “prefix-TIME_IN_MS[.suffix]”.
saveAsHadoopFiles(prefix, [suffix]) 儲存流的內容為hadoop檔案,檔名為 “prefix-TIME_IN_MS[.suffix]”.
foreachRDD(func) 對Dstream裡面的每個RDD執行func