1. 程式人生 > 實用技巧 >Spark Streaming(一):DStream

Spark Streaming(一):DStream

Spark Streaming:Spark提供的,對於大資料進行實時計算的一種框架;它的底層,也是基於Spark Core的;
其基本的計算模型,還是基於記憶體的大資料實時計算模型RDD,只不過,針對實時計算的特點,在RDD之上,進行了一層封裝,叫做DStream(類似Spark SQL中的DataFrame);所以RDD是整個Spark技術生態的核心。

Spark Streaming是Spark Core Api的一種擴充套件,它可以用於進行大規模、高吞吐量、容錯的實時資料流的處理;支援從很多種資料來源中讀取資料,比如Kafka、Flume、Twitter、ZeroMQ、Kinesis或者TCP Socket,並且能夠使用類似高階函式的複雜演算法來進行資料處理,比如map、reduce、join、window;處理後的資料可以被儲存到檔案系統、資料庫、Dashboard等儲存中。

image.png

2、Spark Streaming工作原理

Spark Streaming內部的基本工作原理:接收實時輸入資料流,然後將資料拆分成多個batch,比如每收集1s的資料封裝為一個batch, 然後將每個batch交給Spark的計算引擎進行處理,最後會生產出一個結果資料流,其中的資料,也是一個個的batch所組成的。其中,一個batchInterval累加讀取到的資料對應一個RDD的資料

image.png

3、DStream

DStream:Discretized Stream,離散流,Spark Streaming提供的一種高階抽象,代表了一個持續不斷的資料流;
DStream可以通過輸入資料來源來建立,比如Kafka、Flume,也可以通過對其他DStream應用高階函式來建立,比如map、reduce、join、window;

  • DStream的內部,其實是一系列持續不斷產生的RDD,RDD是Spark Core的核心抽象,即,不可變的,分散式的資料集;
    DStream中的每個RDD都包含了一個時間段內的資料;
    以下圖為例,0-1這段時間的資料累積構成了RDD@time1,1-2這段時間的資料累積構成了RDD@time2,。。。
image.png
  • 對DStream應用的運算元,其實在底層會被翻譯為對DStream中每個RDD的操作;
    比如對一個DStream執行一個map操作,會產生一個新的DStream,其底層原理為,對輸入DStream中的每個時間段的RDD,都應用一遍map操作,然後生成的RDD,即作為新的DStream中的那個時間段的一個RDD;
    底層的RDD的transformation操作,還是由Spark Core的計算引擎來實現的,Spark Streaming對Spark core進行了一層封裝,隱藏了細節,然後對開發人員提供了方便易用的高層次API。
image.png

4、Spark Streaming VS Storm

image.png
  • 優勢
    從上圖中可以看出,Spark Streaming絕對談不上比Storm優秀,這兩個框架在實時計算領域中,都很優秀,只是擅長的細分場景並不相同;

    Spark Streaming僅僅在吞吐量上比Storm要優秀,但問題是,是不是在所有的實時計算場景下,都那麼注重吞吐量?
    不盡然。
    因此,通過吞吐量說Spark Streaming 強於Storm,不靠譜;

    事實上,Storm在實時延遲度上,比Spark Streaming就好多了,Storm是純實時,Spark Streaming是準實時;而且Storm的事務機制,健壯性/容錯性、動態調整並行度等特性,都要比Spark Streaming更加優秀。

    Spark Streaming的真正優勢(Storm絕對比不上的),是它屬於Spark生態技術棧中,因此Spark Streaming可以和Spark Core、Spark SQL無縫整合,而這也就意味著,我們可以對實時處理出來的中間資料,立即在程式中無縫進行延遲批處理、互動式查詢等操作,這個特點大大增強了Spark Streaming的優勢和功能。

  • 應用場景

    • Storm:
      1、建議在那種需要純實時,不能忍受1s以上延遲的場景下使用,比如金融系統,要求純實時進行金融交易和分析;
      2、如果對於實時計算的功能中,要求可靠的事務機制和可靠性機制,即資料的處理完全精準,一條也不能多,一條也不能少,也可以考慮使用Strom;
      3、如果需要針對高峰低峰時間段,動態調整實時計算程式的並行度,以最大限度利用叢集資源,也可以考慮用Storm;
      4、如果一個大資料應用系統,它就是純粹的實時計算,不需要在中間執行SQL互動式查詢、複雜的transformation運算元等,那麼使用Storm是比較好的選擇

    • Spark Streaming:
      1、如果對上述適用於Storm的三點,一條都不滿足的實時場景,即,不要求純實時,不要求強大可靠的事務機制,不要求動態調整並行度,那麼可以考慮使用Spark Streaming;
      2、考慮使用Spark Streaming最主要的一個因素,應該是針對整個專案進行巨集觀的考慮,即,如果一個專案除了實時計算之外,還包括了離線批處理、互動式查詢等業務功能,而且實時計算中,可能還會牽扯到高延遲批處理、互動式查詢等功能,那麼就應該首選Spark生態,用Spark Core開發離線批處理,用Spark SQL開發互動式查詢,用Spark Streaming開發實時計算,三者可以無縫整合,給系統提供非常高的可擴充套件性。

5、StreamingContext

1、建立方式
有兩種建立StreamingContext的方式:

#方法一:
val conf = new SparkConf().setAppName(appName).setMaster(master);
val ssc = new StreamingContext(conf, Seconds(1));

#方法二:可以使用已有的SparkContext來建立
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1));

注:appName,是用來在Spark UI上顯示的應用名稱;
master,是一個Spark、Mesos或者Yarn叢集的URL,或者是local[*];
batch interval可以根據你的應用程式的延遲要求以及可用的叢集資源情況來設定。

2、一個StreamingContext定義之後,必須做以下幾件事情:

  • 1、通過建立輸入DStream來建立輸入資料來源。
  • 2、通過對DStream定義transformation和output運算元操作,來定義實時計算邏輯。
  • 3、呼叫StreamingContext的start()方法,來開始實時處理資料。
  • 4、呼叫StreamingContext的awaitTermination()方法,來等待應用程式的終止。可以使用CTRL+C手動停止,或者就是讓它持續不斷的執行進行計算。
  • 5、也可以通過呼叫StreamingContext的stop()方法,來停止應用程式。

需要注意的要點:
1、只要一個StreamingContext啟動之後,就不能再往其中新增任何計算邏輯了。比如執行start()方法之後,還給某個DStream執行一個運算元。
2、一個StreamingContext停止之後,是肯定不能夠重啟的,呼叫stop()之後,不能再呼叫start()
3、一個JVM同時只能有一個StreamingContext啟動,在你的應用程式中,不能建立兩個StreamingContext。
4、呼叫stop()方法時,會同時停止內部的SparkContext,如果不希望如此,還希望後面繼續使用SparkContext建立其他型別的Context,比如SQLContext,那麼就用stop(false)。
5、一個SparkContext可以建立多個StreamingContext,只要上一個先用stop(false)停止,再建立下一個即可。

6、DStream的transformation操作

image.png image.png

7、wordCount demo

package cn.spark.study.streaming;

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

/**
 * 實時wordcount程式
 */
public class WordCount {
    
    public static void main(String[] args) throws Exception {
        // 建立SparkConf物件
        // 但是這裡有一點不同,我們是要給它設定一個Master屬性,
        //但是我們測試的時候使用local模式
        // local後面必須跟一個方括號,裡面填寫一個數字,數字代表了
        // 我們用幾個執行緒來執行我們的Spark Streaming程式
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("WordCount");  
        
        // 建立JavaStreamingContext物件
        // 該物件,就類似於Spark Core中的JavaSparkContext,Spark SQL中的SQLContext
        // 該物件除了接收SparkConf物件物件之外
        // 還必須接收一個batch interval引數,就是說,每收集多長時間的資料,
        // 劃分為一個batch,進行處理,這裡設定一秒
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
    
        // 首先,建立輸入DStream,代表了一個從資料來源(比如kafka、socket)來的持續不斷的實時資料流
        // 呼叫JavaStreamingContext的socketTextStream()方法,可以建立一個數據源為Socket網路埠的
        // 資料流,JavaReceiverInputStream,代表了一個輸入的DStream
        // socketTextStream()方法接收兩個基本引數,第一個是監聽哪個主機上的埠,第二個是監聽哪個埠
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
        
        // 到這裡為止,你可以理解為JavaReceiverInputDStream中的,每隔一秒,會有一個RDD,其中封裝了
        // 這一秒傳送過來的資料
        // RDD的元素型別為String,即一行一行的文字
        // 所以,這裡JavaReceiverInputStream的泛型型別<String>,其實就代表了它底層的RDD的泛型型別
        
        // 開始對接收到的資料,執行計算,使用Spark Core提供的運算元,執行應用在DStream中即可
        // 在底層,實際上是會對DStream中的一個一個的RDD,執行我們應用在DStream上的運算元
        // 產生的新RDD,會作為新DStream中的RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));  
            }
            
        });
        
        // 這個時候,每秒的資料,一行一行的文字,就會被拆分為多個單詞,words DStream中的RDD的元素型別
        // 即為一個一個的單詞
        
        // 接著,開始進行flatMap、reduceByKey操作
        JavaPairDStream<String, Integer> pairs = words.mapToPair(
                
                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String word)
                            throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                    
                });
        
        // 在這裡可以看出,用Spark Streaming開發程式,和Spark Core很相像
        // 唯一不同的是Spark Core中的JavaRDD、JavaPairRDD,都變成了JavaDStream、JavaPairDStream
        
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
                
                new Function2<Integer, Integer, Integer>() {
            
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }
                    
                });
        
        // 到此為止,我們就實現了實時的wordcount程式了
        // 總結一下
        // 每秒中傳送到指定socket埠上的資料,都會被lines DStream接收到
        // 然後lines DStream會把每秒的資料,也就是一行一行的文字,諸如hell world,封裝為一個RDD
        // 然後呢,就會對每秒中對應的RDD,執行後續的一系列的運算元操作
        // 比如,對lins RDD執行了flatMap之後,得到一個words RDD,作為words DStream中的一個RDD
        // 以此類推,直到生成最後一個,wordCounts RDD,作為wordCounts DStream中的一個RDD
        // 此時,就得到了,每秒鐘傳送過來的資料的單詞統計
        // 但是,一定要注意,Spark Streaming的計算模型,就決定了,我們必須自己來進行中間快取的控制
        // 比如寫入redis等快取
        // 它的計算模型跟Storm是完全不同的,storm是自己編寫的一個一個的程式,執行在節點上,
        // 相當於一個一個的物件,可以自己在物件中控制快取
        // 但是Spark本身是函數語言程式設計的計算模型,所以,比如在words或pairs DStream中,
        // 沒法在例項變數中進行快取
        // 此時就只能將最後計算出的wordCounts中的一個一個的RDD,寫入外部的快取,或者持久化DB
        
        // 最後,每次計算完,都列印一下這一秒鐘的單詞計數情況
        // 並休眠5秒鐘,以便於我們測試和觀察
        
        Thread.sleep(5000);  
        wordCounts.print();
            
        // 首先對JavaSteamingContext進行一下後續處理
        // 必須呼叫JavaStreamingContext的start()方法,整個Spark Streaming Application才會啟動執行
        // 否則是不會執行的
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}

Scala版本:

package cn.spark.study.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object WordCount {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setMaster("local[2]")
        .setAppName("WordCount")
        
    val ssc = new StreamingContext(conf, Seconds(1))
    
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap { _.split(" ") }   
    val pairs = words.map { word => (word, 1) }  
    val wordCounts = pairs.reduceByKey(_ + _)  
    
    Thread.sleep(5000)  
    wordCounts.print()  
    
    ssc.start()
    ssc.awaitTermination()
  }
}

https://www.jianshu.com/u/ea8a81e1d960