通過案例對SparkStreaming透徹理解(1)
本博文主要包含內容為:
1、spark streaming另類線上實驗
2、瞬間理解spark streaming本質
一,對SparkStreaming的深入理解:
1、 首先為何從Spark Streaming切入Spark定製?Spark的子框架已有若干,為何選擇Spark Streaming?
Spark最開始只有Spark Core,沒有目前的這些子框架。這些子框架是構建於Spark Core之上的。沒有哪個子框架能擺脫Spark Core。我們通過對一個框架的徹底研究,肯定可以領會Spark力量的源泉,並精通所有問題的解決之道。
Spark SQL涉及了很多SQL語法細節的解析和優化,當然分析其解析、優化從而集中精力去研究Spark而言是一件重要的事情,但不是最重要的事情,所以Spark SQL不太適合作為具體的子框架值得我們去研究。
目前Spark R現在不成熟,支撐功能有限。
圖計算,從各版本演進而言Graphx幾乎沒有改進,這種趨勢,Graphx是不是已經發展基本到盡頭了;另外圖計算而言有很多數學級別的演算法,而要把Spark做到極致,數學對我們來說重要,但對於研究而言不是最重要的。
Mechine Learning在封裝了Vector向量、Metrics構建了眾多的演算法庫,從而涉及了太多的數學知識,所有選擇ML其實也不是太好的選擇。
最後篩選出SparkStreaming子框架才是最佳的研究切入黃金點。
2、對SparkStreaming的理解?
- Spark Streaming是流式計算,當今時代是一個流處理時代,一切資料如果不是流式處理, 或者說和流式處理不相關的話,都是無效的資料。
-流式處理才是我們對大資料的初步印象,而不是批處理和資料探勘,當然Spark強悍的地方在於,他的流式處理可以線上的直接使用機器學習、圖計算、SparkSQL、Spark R的成果。 - 整個Spark的程式,基於Spark Streaming的最容易出問題,也是最受關注的地方,也是最需要人才的地方。
- Spark Streaming和其他子框架的不同之處,Spark Streaming很像基於Spark Core之上的應用程式。
- 正如世界萬物發展一樣,任何技術都有其關鍵點或轉折點,SparkStreaming相當於獨孤九劍,SparkCore 相當於易筋經。SparkStreaming執行在SparkCore上,所以很多效能調優都是建立在SparkCore上的;Spark是大資料的龍脈,SparkStreaming是龍脈的穴位。尋龍點穴,Spark就是龍脈,Spark Streaming就是穴位
3、當今現狀
2015年是流式處理的一年。大家考慮用Spark,主要也是因為Spark Streaming。這是一個流處理的時代,一切資料如果與流式處理不相關的話,都是無效的資料。Spark之所以強悍的一個重要原因在於,它的流式處理可以線上使用圖計算、機器學習或者SparkR的成果,這得益於Spark一體化、多元化的基礎架構設計。也就是在Spark Streaming中可以呼叫其它子框架,無需任何設定。這是Spark的無可匹敵之處,也是Spark Streaming必將一統天下的根源。但Spark的應用中,Spark Streaming也是最容易出問題的。
Spark Streaming與其它子框架不同之處在於,它更像是Spark Core之上的一個應用程式。所以如果要做Spark的定製開發,Spark Streaming則提供了最好的參考。你想掌握Spark Streaming,但你不去精通Spark Core的話,那是不可能的。所以我們選擇Spark Streaming來提升自己,是找到了關鍵點。
二:通過案例來深入理解SparkStreaming工作原理
1、研究SparkStreaming時,有困惑你的東西,SparkStreaming資料不斷流進來,根據batchInterval時間片不斷生成Job,並將Job提交叢集處理,如果能清晰的看到資料的流入和資料的處理,你心裡會很很踏實。
如何能清晰的看到資料的處理過程呢?只需要一個小技巧:就是把SparkStreaming中的batchInterval放的足夠大,例如說從30秒調整為1分鐘一次batch,或者5分鐘一次batch,你會很清晰的看到整個流程式的執行過程。在這裡利用上篇博文的程式碼
/**
* Created by hadoop on 2016/4/18.
* 背景描述 在廣告點選計費系統中 我們線上過濾掉 黑名單的點選 進而保護廣告商的利益
* 只有效的廣告點選計費
*
*/
object OnlineBlackListFilter {
def main(args: Array[String]){
/**
* 第1步:建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊,
* 例如說通過setMaster來設定程式要連結的Spark叢集的Master的URL,如果設定
* 為local,則代表Spark程式在本地執行,特別適合於機器配置條件非常差(例如
* 只有1G的記憶體)的初學者。
*/
// 建立SparkConf物件
val conf = new SparkConf()
// 設定應用程式的名稱,在程式執行的監控介面可以看到名稱
conf.setAppName("OnlineBlackListFilter")
// 此時,程式在Spark叢集
conf.setMaster("spark://Master:7077")
val ssc = new StreamingContext(conf, Seconds(30))
/**
* 黑名單資料準備,實際上黑名單一般都是動態的,例如在Redis或者資料庫中,
* 黑名單的生成往往有複雜的業務邏輯,具體情況演算法不同,
* 但是在Spark Streaming進行處理的時候每次都能夠訪問完整的資訊。
*/
val blackList = Array(("Spy", true),("Cheater", true))
val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
val adsClickStream = ssc.socketTextStream("Master", 9999)
/**
* 此處模擬的廣告點選的每條資料的格式為:time、name
* 此處map操作的結果是name、(time,name)的格式
*/
val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
adsClickStreamFormatted.transform(userClickRDD => {
// 通過leftOuterJoin操作既保留了左側使用者廣告點選內容的RDD的所有內容,
// 又獲得了相應點選內容是否在黑名單中
val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
/**
* 進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean))
* 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進行leftOuterJoin的時候是否存在的值。
* 如果存在的話,表面當前廣告點選是黑名單,需要過濾掉,否則的話是有效點選內容;
*/
val validClicked = joinedBlackListRDD.filter(joinedItem => {
if(joinedItem._2._2.getOrElse(false))
{
false
} else {
true
}
})
validClicked.map(validClick => {validClick._2._1})
}).print
/**
* 計算後的有效資料一般都會寫入Kafka中,下游的計費系統會從kafka中pull到有效資料進行計費
*/
ssc.start()
ssc.awaitTermination()
}
}
2、 把程式的Batch Interval設定成300秒:
3、 重新生成一下jar包 。
4、啟動Hadoop的HDFS、 啟動Spark叢集,啟動spark的History Server,並且通過web介面是否啟動成功,開啟資料傳送的埠 : nc -lk 9999
5、利用指令碼, 用spark-submit執行前面生成的jar包 。
6、 在資料傳送埠輸入若干資料,形式比如:
333333 Hadoop
222222 spark
111111 hadoop
555555 Kafka
6666666 Demo
999999 SparkSQL
7、出現如下結果說明執行成功:
8、開啟瀏覽器,看History Server裡面的最新的日誌資訊,看我們目前執行的應用程式中有些什麼Job:
總共竟然有5個Job。這完全不是我們此前做Spark SQL之類的應用程式時看到的樣子。
我們接下來看一看這些Job的內容,主要揭示一些現象,不會做完全深入的剖析,只是為了先讓大家進行一些思考。
- Job 0:此Job不體現我們的業務邏輯程式碼。這個Job是出於對後面計算的負載均衡的考慮。
發現此Stage在所有Executor上都存在。
- Job 1:執行時間比較長,耗時5.2分鐘。
點選Stage 2的連結,進去看看Aggregated Metrics By Executor部分:
可以知道,Stage 2只在Worker1上的一個Executor執行,而且執行了5.2分鐘。
是否會覺得奇怪:從業務處理的角度看,我們傳送的那麼一點資料,沒有必要去啟動一個執行5.2分鐘的任務吧。那這個任務是做什麼呢?
從DAG Visualization部分,就知道此Job實際就是啟動了一個接收資料的Receiver:
**原來Receiver是通過一個Job來啟動的。那肯定有一個Action來觸發它。
只有一個Worker執行此Job。是用於接收資料。**
Locality Level是PROCESS_LOCAL,原來是記憶體節點。所以,預設情況下,只要資料不是特別大,資料接收不會使用磁碟,而是直接使用記憶體中的資料。
看來,Spark Streaming應用程式啟動後,自己會啟動一些Job。預設啟動了一個Job來接收資料,為後續處理做準備。
重要啟示:一個Spark應用程式中可以啟動很多Job,而這些不同的Job之間可以相互配合。這一認識為我們寫複雜Spark程式奠定了良好的基礎。
- Job 2:看Details可以發現有我們程式的主要業務邏輯,體現在Stag 3、Stag4、Stag 5中。
我們看Stag3、Stage4的詳情,可以知道這2個Stage都是用2個Executor執行的。所有資料處理是在2臺機器上進行的。
Stag 5只在Worker1上。這是因為這個Stage有Shuffle操作。
- Job3:有Stage 6、Stage 7、Stage 8。其中Stage 6、Stage 7被跳過。
看看Stage 8的Aggregated Metrics by Executor部分。可以看到,資料處理是在2臺機器上進行的:
- Job4:也體現了我們應用程式中的業務邏輯 。有Stage 9、Stage 10、Stage 11。其中Stage 9、Stage 10被跳過。
看看Stage 11的詳情。可以看到,資料處理是在2臺機器上進行的
綜合以上的現象可以知道,Spark Streaming的一個應用中,運行了這麼多Job,遠不是我們從網路部落格或者書籍上看的那麼簡單。
我們有必要通過這些現象,反過來回溯去尋根問源。不過這次暫不做深入分析。
我們的神奇之旅才剛剛開始。
三、SparkStreaming本質的深入理解
- Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各種來源的實時輸入資料,進行處理後,處理結果儲存在HDFS、Databases等各種地方。
- Spark Streaming接收這些實時輸入資料流,會將它們按批次劃分,然後交給Spark引擎處理,生成按照批次劃分的結果流。
- Spark Streaming提供了表示連續資料流的、高度抽象的被稱為離散流的DStream。DStream本質上表示RDD的序列。任何對DStream的操作都會轉變為對底層RDD的操作。
- Spark Streaming使用資料來源產生的資料流建立DStream,也可以在已有的DStream上使用一些操作來建立新的DStream。
在我們前面的實驗中,每300秒會產生一批資料,基於這批資料會生成RDD,進而觸發Job,執行處理。
DStream是一個沒有邊界的集合,沒有大小的限制。
DStream代表了時空的概念。隨著時間的推移,裡面不斷產生RDD。
鎖定到時間段後,就是空間的操作。也就是對本時間段的對應批次的資料的處理。
下面用例項來講解資料處理過程。
資料處理會有若干個對DStream的操作,這些操作之間的依賴關係,構成了DStreamGraph。如以下圖例所示:
上圖中每個foreach都會觸發一個作業,就會順著依賴從後往前回溯,形成DAG,如下圖所示:
空間維度確定之後,隨著時間不斷推進,會不斷例項化RDD Graph,然後觸發Job去執行處理。
四、接下來我們要做的就是重新閱讀官網的SparkStreaming
博文內容源自DT大資料夢工廠Spark課程總結的筆記。相關課程內容視訊可以參考:
百度網盤連結:http://pan.baidu.com/s/1slvODe1(如果連結失效或需要後續的更多資源,請聯絡QQ460507491或者微訊號:DT1219477246 獲取上述資料)。