1. 程式人生 > >SparkStreaming(五)操作函式之Transformations

SparkStreaming(五)操作函式之Transformations

目錄:

5、SparkStreaming中的操作函式分析

5.1、Transformations

5.1.1、map(func)

5.1.2、flatMap(func)

5.1.4、union(otherStream)

5.1.5、count()

5.1.6、reduce(func)

5.1.8、reduceByKey(func, [num Tasks])

5.1.9、join(otherStream, [numTasks])

5.1.10、cogroup(otherStream, [numTasks])

5.1.11、transform(func)

5.1.12、updateStateByKey(func)


5、SparkStreaming中的操作函式分析

根據Spark官方文件中的描述,在Spark Streaming應用中,一個DStream物件可以呼叫多種操作,主要分為以下幾類。

  • Transformations 普通的轉換操作
  • Window Operations 視窗轉換操作
  • Join Operations 合併操作
  • Output Operations 輸出操作

5.1、Transformations

普通的轉換操作如下表所示:

Transformation

Meaning

map(func)

利用方法func對源DStream中的元素分別進行處理,並返回一個新的DStream。

flatMap(func)

和map類似,不過每個輸入元素可以被對映為0或多個輸出元素。

filter(func)

選取被func方法計算後返回true的元素,形成新的DSteeam並返回。

repartition(numPartitions)

通過增加或減少分割槽數改變DStream的並行度。

union(otherStream

)

將源DStream和otherDStream中所有元素取並集,形成一個新的DStream並返回。

count()

計算DStream中的每個RDD中的元素個數,每個RDD返回一個“單元素RDD”,這些單元素RDD組成新的DStream並返回。

reduce(func)

對DStream中每個RDD中的所有元素分別進行聚合,每個RDD生成一個單元素RDD,這些單元素RDD組成新的DStream並返回,func函式接受兩個引數並有一個返回值,且func操作必須是associative和 commutative,這樣才能支援平行計算。

countByValue()

對元素型別為K的DStream呼叫該方法,將返回型別為(K,Long)鍵值對的新DStream。“鍵”對應的“值”是該“鍵”在源DStream中每個RDD中的出現頻率。

reduceByKey(func, [numTasks])

當對元素型別為(K, V)對的DStream呼叫該方法,返回(K,V)對型別的新DStream,其中使用給定的reduce函式聚合每個鍵的值。注意:預設情況下,它使用Spark的預設並行任務數(本地模式下為2,群集模式中的並行數由屬性spark.default.parallelism指定)進行分組。您可以傳遞一個可選的numTasks引數來設定task的數量。

join(otherStream, [numTasks])

當源DStream型別為(K, V),otherStream型別為(K, W)時,返回一個新的型別為(K, (V,W))的DStream。

cogroup(otherStream, [numTasks])

當源DStream型別為(K, V),otherStream型別為(K, W)時,返回一個新的型別為(K, Seq[V], Seq[W])的DStream。

transform(func)

通過對源DStream的每個RDD應用RDD-to-RDD函式來返回一個新的DStream。這可以用於對DStream進行任意RDD操作。

updateStateByKey(func)

返回一個新的“state”DStream,其中通過對key的先前狀態和新的values應用給定的方法func,將計算結果用來更新每個key的狀態。這可以用於維護每個key的任意的狀態資料。

5.1.1、map(func)

map操作需要傳入一個函式當做引數,具體呼叫形式為:

JavaDStream<String> b = a.map(func);

主要作用是,對DStream物件a,將func函式作用到a中的每個元素上並生成一個新的元素,得到DStream物件b中包含的這些新元素。

下面示例程式碼的作用是,在接收到的一行訊息後面拼接一個”_NEW”字串。

JavaDStream<String> linesNew = lines.map(line -> line + "_NEW");

服務端:

客戶端:

         注意與接下來的flatMap操作比較。

5.1.2、flatMap(func)

類似於上面的map操作,具體呼叫形式為:

         JavaDStream<String> b = a.flatMap(func);

         主要作用是,對DStream物件a,將func函式作用到a中的每個元素上並生成0個或多個新元素,得到的DStream物件b包含這些新的元素。

         下面示例程式碼的作用是,在接收到的一行訊息lines後,將lines根據空格進行分割,分割成若干個單詞。

         JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

服務端:

客戶端:

map和flatMap的區別:

map函式會對每一條輸入進行指定的操作,然後為每一條輸入返回一個物件。

而flatMap函式則是兩個操作的集合——正是“先對映後扁平化”:

操作1:同map函式一樣:對每一條輸入進行指定的操作,然後為每一條輸入返回一個物件。

操作2:最後將所有物件合併為一個物件。

5.1.3、filter(func)

filter傳入一個func函式,具體呼叫形式為:

         JavaDStream<String> b = a.filter(func);

         對DStream物件a中的每一個元素,應用func方法進行計算,如果func返回結果為true,則保留該元素,否則丟棄該元素,返回一個新的DStream物件b。

         下面示例程式碼中,對words進行判斷,去除”hello”這個單詞。

         JavaDStream<String> filterWords = words.filter(word -> !word.equals("hello"));

服務端:

客戶端:

5.1.4、union(otherStream)

這個操作將兩個DStream進行合併,生成一個包含著兩個DStream中所有元素的的新DStream物件。

下面程式碼,首先將輸入的每個單詞後面分別拼接一個”_one”和”_two”,最後將這兩個DStream合併成一個新的DStream。

JavaDStream<String> wordsOne = words.map(word -> word + "_one");

JavaDStream<String> wordsTwo = words.map(word -> word + "_two");

JavaDStream<String> unionWords = wordsOne.union(wordsTwo);

服務端:

客戶端:

結果:

5.1.5、count()

         統計DStream中每個RDD包含的元素的個數,得到一個新的DStream,這個DStream只包含一個元素,這個元素是對語句單詞統計數值。

JavaDStream<Long> wordsCount = words.count();

         執行結果如下,一行輸入4個單詞,列印的結果也為4。

服務端:

客戶端:

結果:

5.1.6、reduce(func)

返回一個包含一個元素的DStream,傳入的func方法會作用在呼叫者的每個元素上,將其中的元素順次的兩兩進行計算。

下面的程式碼,將每一個單詞用"-"符號進行拼接。

服務端:

客戶端:

結果:

5.1.7 countByValue()

某個DStream中的元素型別為K,呼叫這個方法後,返回的DStream的元素為(K, Long)對,後面這個Long值是原DStream中每個RDD元素key出現的頻率。

         以下程式碼統計words中不同單詞的個數:

         JavaDStream<Long> countByValueWords = words.countByValue();

服務端:

客戶端:

結果:

5.1.8、reduceByKey(func, [num Tasks])

         呼叫這個操作的DStream是以(K, V)的形式出現,返回一個新的元素格式為(K, V)的DStream。返回結果中,K為原來的K,V是由K經過傳入func計算得到的。還可以傳入一個平行計算的引數,在local模式下,預設為2。在其他模式下,預設值由引數spark.default.parallelism確定。

         下面程式碼將words轉化成(word, 1)的形式,再以單詞為key,個數為value,進行word count。

         JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));  

JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a,b) -> (a + b));

服務端:

客戶端:

結果:

5.1.9、join(otherStream, [numTasks])

         由一個DStream物件呼叫該方法,元素內容為(K, V),傳入另一個DStream物件,元素內容為(K, W),返回的DStream中包含的內容是(K, (V, W))。這個方法也可以傳入一個平行計算的引數,該引數與reduceByKey中是相同的。

         下面程式碼中,首先將words轉化成(word, (word + “_one”))和(word, (word + “_two”))的形式,再以word為key,將後面的value合併到一起。

服務端:

客戶端:

結果:

5.1.10、cogroup(otherStream, [numTasks])

由一個DStream物件呼叫該方法,元素內容為(k, V),傳入另一個DStream物件,元素內容為(k, W),返回的DStream中包含的內容是(k, (Seq[V], Seq[W]))。這個方法也可以傳入一個平行計算的引數,該引數與reduceByKey中是相同的。

 下面程式碼首先將words轉化成(word, (word + "_one"))和(word, (word + "_two"))的形式,再以word為key,將後面的value合併到一起。

服務端:

客戶端:

結果:

5.1.11、transform(func)

在Spark-Streaming官方文件中提到,DStream的transform操作極大的豐富了DStream上能夠進行的操作內容。

使用transform操作後,除了可以使用DStream提供的一些轉換方法之外,還能夠直接呼叫任意的呼叫RDD上的操作函式。 

         比如下面的程式碼中,使用transform完成將一行語句分割成單詞的功能。

                   JavaDStream<String> words = lines.transform(line -> line.

                                     flatMap(a -> Arrays.asList(a.split(" ")).iterator()

                   ));

服務端:

客戶端:

結果:

5.1.12、updateStateByKey(func)

updateStateByKey操作以DStream中的資料進行按keyreduce操作,然後對各個批次的資料進行累加。在有新的資料持續更新時保持任意的狀態。要使用這個操作要操作兩個步驟。

1、 定義狀態:可以是任意的資料型別

2、 定義狀態更新函式使用更新函式來指定如何使用先前狀態和輸入流中的新值更新狀態。

從輸入流中的新值更新狀態。對於有狀態操作,要不斷的把當前和歷史的時間切片的RDD累加計算,隨著時間的流失,計算的資料規模會變得越來越大。

在每個批處理中,Spark都將對所有現有鍵應用狀態更新函式,而不管它們在批處理中是否有新資料。如果update函式返回None,那麼key-value對將被消除。

讓我們舉個例子來說明這一點。假設您希望在文字資料流中維護每個單詞的執行計數。在這裡,執行計數是狀態,它是一個整數。我們將更新功能定義為:

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =

(values, state) -> {

Integer newSum = ... // 新增新值與之前的執行計數以獲得新計數

return Optional.of(newSum);

};

這是在一個包含單詞的DStream上應用(比如說,在上述示例中包含(word,1)鍵值對的DStream  

         將為每個單詞呼叫更新函式,其中newValues具有1的順序(來自(word, 1)鍵值對)具有runningCount持有先前的計數。

         需注意:使用updateStateByKey需要配置檢查點目錄。

即如果要不斷的更新每一個key的state,就一定會涉及到狀態的儲存和容錯,這個時候就需要開啟checkpoint機制和功能,需要說明的是checkpoint的資料可以儲存在一些儲存在檔案系統上的內容。例如:程式未處理的但已經擁有狀態的資料。

報錯解決問題做checkpoint,開啟checkpoint機制,把checkpoint中的資料放在這裡設定的目錄中。生產環境下一般放在HDFS中。

jssc.checkpoint("hdfs://192.1.101.61:9000/in/ch");