Spark Streaming(一):DStream
Spark Streaming:Spark提供的,對於大資料進行實時計算的一種框架;它的底層,也是基於Spark Core的;
其基本的計算模型,還是基於記憶體的大資料實時計算模型RDD,只不過,針對實時計算的特點,在RDD之上,進行了一層封裝,叫做DStream(類似Spark SQL中的DataFrame);所以RDD是整個Spark技術生態的核心。
Spark Streaming是Spark Core Api的一種擴充套件,它可以用於進行大規模、高吞吐量、容錯的實時資料流的處理;支援從很多種資料來源中讀取資料,比如Kafka、Flume、Twitter、ZeroMQ、Kinesis或者TCP Socket,並且能夠使用類似高階函式的複雜演算法來進行資料處理,比如map、reduce、join、window;處理後的資料可以被儲存到檔案系統、資料庫、Dashboard等儲存中。
2、Spark Streaming工作原理
Spark Streaming內部的基本工作原理:接收實時輸入資料流,然後將資料拆分成多個batch,比如每收集1s的資料封裝為一個batch, 然後將每個batch交給Spark的計算引擎進行處理,最後會生產出一個結果資料流,其中的資料,也是一個個的batch所組成的。其中,一個batchInterval累加讀取到的資料對應一個RDD的資料
image.png3、DStream
DStream:Discretized Stream,離散流,Spark Streaming提供的一種高階抽象,代表了一個持續不斷的資料流;
DStream可以通過輸入資料來源來建立,比如Kafka、Flume,也可以通過對其他DStream應用高階函式來建立,比如map、reduce、join、window;
- DStream的內部,其實是一系列持續不斷產生的RDD,RDD是Spark Core的核心抽象,即,不可變的,分散式的資料集;
DStream中的每個RDD都包含了一個時間段內的資料;
以下圖為例,0-1這段時間的資料累積構成了RDD@time1,1-2這段時間的資料累積構成了RDD@time2,。。。
- 對DStream應用的運算元,其實在底層會被翻譯為對DStream中每個RDD的操作;
比如對一個DStream執行一個map操作,會產生一個新的DStream,其底層原理為,對輸入DStream中的每個時間段的RDD,都應用一遍map操作,然後生成的RDD,即作為新的DStream中的那個時間段的一個RDD;
底層的RDD的transformation操作,還是由Spark Core的計算引擎來實現的,Spark Streaming對Spark core進行了一層封裝,隱藏了細節,然後對開發人員提供了方便易用的高層次API。
4、Spark Streaming VS Storm
image.png-
優勢
從上圖中可以看出,Spark Streaming絕對談不上比Storm優秀,這兩個框架在實時計算領域中,都很優秀,只是擅長的細分場景並不相同;Spark Streaming僅僅在吞吐量上比Storm要優秀,但問題是,是不是在所有的實時計算場景下,都那麼注重吞吐量?
不盡然。
因此,通過吞吐量說Spark Streaming 強於Storm,不靠譜;事實上,Storm在實時延遲度上,比Spark Streaming就好多了,Storm是純實時,Spark Streaming是準實時;而且Storm的事務機制,健壯性/容錯性、動態調整並行度等特性,都要比Spark Streaming更加優秀。
Spark Streaming的真正優勢(Storm絕對比不上的),是它屬於Spark生態技術棧中,因此Spark Streaming可以和Spark Core、Spark SQL無縫整合,而這也就意味著,我們可以對實時處理出來的中間資料,立即在程式中無縫進行延遲批處理、互動式查詢等操作,這個特點大大增強了Spark Streaming的優勢和功能。
-
應用場景
-
Storm:
1、建議在那種需要純實時,不能忍受1s以上延遲的場景下使用,比如金融系統,要求純實時進行金融交易和分析;
2、如果對於實時計算的功能中,要求可靠的事務機制和可靠性機制,即資料的處理完全精準,一條也不能多,一條也不能少,也可以考慮使用Strom;
3、如果需要針對高峰低峰時間段,動態調整實時計算程式的並行度,以最大限度利用叢集資源,也可以考慮用Storm;
4、如果一個大資料應用系統,它就是純粹的實時計算,不需要在中間執行SQL互動式查詢、複雜的transformation運算元等,那麼使用Storm是比較好的選擇 -
Spark Streaming:
1、如果對上述適用於Storm的三點,一條都不滿足的實時場景,即,不要求純實時,不要求強大可靠的事務機制,不要求動態調整並行度,那麼可以考慮使用Spark Streaming;
2、考慮使用Spark Streaming最主要的一個因素,應該是針對整個專案進行巨集觀的考慮,即,如果一個專案除了實時計算之外,還包括了離線批處理、互動式查詢等業務功能,而且實時計算中,可能還會牽扯到高延遲批處理、互動式查詢等功能,那麼就應該首選Spark生態,用Spark Core開發離線批處理,用Spark SQL開發互動式查詢,用Spark Streaming開發實時計算,三者可以無縫整合,給系統提供非常高的可擴充套件性。
-
5、StreamingContext
1、建立方式
有兩種建立StreamingContext的方式:
#方法一:
val conf = new SparkConf().setAppName(appName).setMaster(master);
val ssc = new StreamingContext(conf, Seconds(1));
#方法二:可以使用已有的SparkContext來建立
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1));
注:appName,是用來在Spark UI上顯示的應用名稱;
master,是一個Spark、Mesos或者Yarn叢集的URL,或者是local[*];
batch interval可以根據你的應用程式的延遲要求以及可用的叢集資源情況來設定。
2、一個StreamingContext定義之後,必須做以下幾件事情:
- 1、通過建立輸入DStream來建立輸入資料來源。
- 2、通過對DStream定義transformation和output運算元操作,來定義實時計算邏輯。
- 3、呼叫StreamingContext的start()方法,來開始實時處理資料。
- 4、呼叫StreamingContext的awaitTermination()方法,來等待應用程式的終止。可以使用CTRL+C手動停止,或者就是讓它持續不斷的執行進行計算。
- 5、也可以通過呼叫StreamingContext的stop()方法,來停止應用程式。
需要注意的要點:
1、只要一個StreamingContext啟動之後,就不能再往其中新增任何計算邏輯了。比如執行start()方法之後,還給某個DStream執行一個運算元。
2、一個StreamingContext停止之後,是肯定不能夠重啟的,呼叫stop()之後,不能再呼叫start()
3、一個JVM同時只能有一個StreamingContext啟動,在你的應用程式中,不能建立兩個StreamingContext。
4、呼叫stop()方法時,會同時停止內部的SparkContext,如果不希望如此,還希望後面繼續使用SparkContext建立其他型別的Context,比如SQLContext,那麼就用stop(false)。
5、一個SparkContext可以建立多個StreamingContext,只要上一個先用stop(false)停止,再建立下一個即可。
6、DStream的transformation操作
image.png image.png7、wordCount demo
package cn.spark.study.streaming;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
/**
* 實時wordcount程式
*/
public class WordCount {
public static void main(String[] args) throws Exception {
// 建立SparkConf物件
// 但是這裡有一點不同,我們是要給它設定一個Master屬性,
//但是我們測試的時候使用local模式
// local後面必須跟一個方括號,裡面填寫一個數字,數字代表了
// 我們用幾個執行緒來執行我們的Spark Streaming程式
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("WordCount");
// 建立JavaStreamingContext物件
// 該物件,就類似於Spark Core中的JavaSparkContext,Spark SQL中的SQLContext
// 該物件除了接收SparkConf物件物件之外
// 還必須接收一個batch interval引數,就是說,每收集多長時間的資料,
// 劃分為一個batch,進行處理,這裡設定一秒
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// 首先,建立輸入DStream,代表了一個從資料來源(比如kafka、socket)來的持續不斷的實時資料流
// 呼叫JavaStreamingContext的socketTextStream()方法,可以建立一個數據源為Socket網路埠的
// 資料流,JavaReceiverInputStream,代表了一個輸入的DStream
// socketTextStream()方法接收兩個基本引數,第一個是監聽哪個主機上的埠,第二個是監聽哪個埠
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// 到這裡為止,你可以理解為JavaReceiverInputDStream中的,每隔一秒,會有一個RDD,其中封裝了
// 這一秒傳送過來的資料
// RDD的元素型別為String,即一行一行的文字
// 所以,這裡JavaReceiverInputStream的泛型型別<String>,其實就代表了它底層的RDD的泛型型別
// 開始對接收到的資料,執行計算,使用Spark Core提供的運算元,執行應用在DStream中即可
// 在底層,實際上是會對DStream中的一個一個的RDD,執行我們應用在DStream上的運算元
// 產生的新RDD,會作為新DStream中的RDD
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
// 這個時候,每秒的資料,一行一行的文字,就會被拆分為多個單詞,words DStream中的RDD的元素型別
// 即為一個一個的單詞
// 接著,開始進行flatMap、reduceByKey操作
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word)
throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
// 在這裡可以看出,用Spark Streaming開發程式,和Spark Core很相像
// 唯一不同的是Spark Core中的JavaRDD、JavaPairRDD,都變成了JavaDStream、JavaPairDStream
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 到此為止,我們就實現了實時的wordcount程式了
// 總結一下
// 每秒中傳送到指定socket埠上的資料,都會被lines DStream接收到
// 然後lines DStream會把每秒的資料,也就是一行一行的文字,諸如hell world,封裝為一個RDD
// 然後呢,就會對每秒中對應的RDD,執行後續的一系列的運算元操作
// 比如,對lins RDD執行了flatMap之後,得到一個words RDD,作為words DStream中的一個RDD
// 以此類推,直到生成最後一個,wordCounts RDD,作為wordCounts DStream中的一個RDD
// 此時,就得到了,每秒鐘傳送過來的資料的單詞統計
// 但是,一定要注意,Spark Streaming的計算模型,就決定了,我們必須自己來進行中間快取的控制
// 比如寫入redis等快取
// 它的計算模型跟Storm是完全不同的,storm是自己編寫的一個一個的程式,執行在節點上,
// 相當於一個一個的物件,可以自己在物件中控制快取
// 但是Spark本身是函數語言程式設計的計算模型,所以,比如在words或pairs DStream中,
// 沒法在例項變數中進行快取
// 此時就只能將最後計算出的wordCounts中的一個一個的RDD,寫入外部的快取,或者持久化DB
// 最後,每次計算完,都列印一下這一秒鐘的單詞計數情況
// 並休眠5秒鐘,以便於我們測試和觀察
Thread.sleep(5000);
wordCounts.print();
// 首先對JavaSteamingContext進行一下後續處理
// 必須呼叫JavaStreamingContext的start()方法,整個Spark Streaming Application才會啟動執行
// 否則是不會執行的
jssc.start();
jssc.awaitTermination();
jssc.close();
}
}
Scala版本:
package cn.spark.study.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("WordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap { _.split(" ") }
val pairs = words.map { word => (word, 1) }
val wordCounts = pairs.reduceByKey(_ + _)
Thread.sleep(5000)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
https://www.jianshu.com/u/ea8a81e1d960