與RDDs類似,轉換操作允許對來自輸入DStreams的資料進行修改。DStreams支援許多在通常Spark RDD上的轉換操作。下面是一些常見的:
轉換 | 含義 |
map(func) | Return a new DStream by passing each element of the source DStream through a functionfunc. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on whichfunc |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream andotherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a functionfunc (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function.Note: By default, this uses Spark's default number of parallel tasks (2 for local
mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism ) to do the grouping. You can pass an optionalnumTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
這個函式被應用到一個包含單詞的DStream(用第二節例子中的paris DStream,它包含了(word,1)的鍵值對)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
Transform操作允許任意RDD-to-RDD型別的函式被應用在一個DStream上。通過它可以在DStream上使用任何沒有在DStream API中暴露的任意RDD操作。比如,將DStream的每批資料加入另一個數據集的功能在DStream API中沒有直接暴躁。但是,我們可以很容易地通過transform做到這一點。Transform使很多強大的功能變為可能。再比如,你想實時地清理加入到輸入DStream中的垃圾郵件資訊,並過濾它們。val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
如上圖所示 ,視窗在源DStream上滑動的每個時間點,在視窗中的RDDs被組合和計算用來產生出基於Window Dstream的RDDs.在這個例子中,操作被應用在最近三個時間單元中的資料,被劃分成了2個時間段。每個視窗操作需要指定兩個引數:
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
Transformation | Meaning |
window(windowLength, slideInterval) | Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow(windowLength, slideInterval) | Return a sliding window count of elements in the stream. |
reduceByWindow(func, windowLength, slideInterval) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval usingfunc. The function should be associative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce functionfunc over batches in a sliding window.
Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism ) to do the grouping. You can pass an optionalnumTasks
argument to set a different number of tasks. |
reduceByKeyAndWindow(func, invFunc, windowLength,slideInterval, [numTasks]) | A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window,
and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have
a corresponding "inverse reduce" function (taken as parameter invFunc. Like inreduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. Note that [checkpointing](#checkpointing) must be enabled for
using this operation. |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like inreduceByKeyAndWindow , the number of reduce tasks is configurable through an optional
argument. |
Stream-stream joins
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
