Spark面試題(二)
一、spark streaming和storm有何區別?
一個實時毫秒,一個準實時亞秒,不過storm的吞吐率比較低。
二、spark有哪些元件?
Master:管理叢集和節點,不參與計算。
Worker:計算節點,程序本身不參與計算,和master彙報。
Driver:執行程式的main方法,建立sparkcontext物件。
Spark context:控制整個application的生命週期,包括DAGSchedular和TaskSchedular等元件。
Client:使用者提交程式的入口。
三、spark的工作機制
使用者在client端提交作業後,會由Driver執行main方法並建立spark context。
執行RDD運算元,形成DAG圖輸入DAGSchedular,按照RDD之間的依賴關係劃分stage輸入TaskSchedular。
TaskSchedular會將stage劃分為task set分發到各個節點的executor中執行。
四、spark中的寬窄依賴
RDD和他依賴的父RDD的關係有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
寬依賴:指的是多個子RDD的partition會依賴同一個父RDD的partition。
窄依賴:指的是每一個父RDD的partition最多被子RDD的一個Partition使用。
五、spark中如何劃分stage?
1.spark application中可以因為不同的action觸發眾多的job,一個Application中可以有很多job,每個job是有一個或多個stage構成的,後面的stage依賴於前面的stage,也就是說只有前面的stage計算完畢後,後面的stage才會執行。
2.stage劃分的依據是寬依賴,何時產生寬依賴,例如ReduceBykey,GroupByKey的運算元,會導致寬依賴的產生。
3.由Action運算元(例如collect)導致了SparkContext.RunJob的執行,最終導致了DAGSchedular的submitJob的執行,其核心是通過傳送一個case class Jobsubmitted物件給eventProcessLoop。
EventProcessLoop是DAGSchedularEventProcessLoop的具體事例,而DAGSchedularEventProcessLoop是eventLoop的子類,具體實現EventLoop的onReceiver方法,onReceiver方法轉過來回調doOnReceive。
4.在handleJobSubmitted中首先建立finalStage,建立finalStage時候會建立父Stage的依賴鏈條。
總結:依賴是從程式碼的邏輯層面上來展開說的,可以簡單點說:寫介紹什麼是RDD中的寬窄依賴,然後再根據DAG有向無環圖進行劃分,從當前job的最後一個運算元往前推,遇到寬依賴,那麼當前在這個批次中的所有運算元操作都劃分成一個stage,然後繼續按照這種方式再繼續往前推,如再遇到寬依賴,又劃分成一個stage,一直到最前面的一個運算元。最後整個job會被劃分成多個stage,而stage之間又存在依賴關係,後面的stage依賴於前面的stage。
六、spark-submit的時候如何引入外部jar包
在通過spark-submit提交任務時,可以通過新增配置引數來指定:
--driver-class-path 外部jar包
--jars 外部jar包
七、spark中cache和persist的區別?
Cache:快取資料,預設是快取在記憶體中,,其本質還是呼叫persist
Persist:快取資料,有豐富的快取策略。資料可以儲存在記憶體也可以儲存在磁碟中,使用的時候指定對應的快取級別。
八、flume整合Spark Streaming問題。
(1)如何實現Spark Streaming讀取flume中的資料
可以這樣說:
前期經過技術調研,在檢視官網資料,發現Spark Streaming整合flume有兩種方式:拉模式,推模式。
拉模式:Flume把資料push到Spark Streaming
推模式:Spark Streaming從flume中poll資料
(2)在實際開發的時候是如何保證資料不丟失的
可以這樣說:
flume那邊採用的channel是將資料落地到磁碟中,保證資料來源端安全性(可以在補充一下,flume在這裡的channel可以設定為memory記憶體中,提高資料接收處理的效率,但是由於資料在記憶體中,安全機制保證不了,故選擇channel為磁碟儲存。整個流程執行有一點的延遲性)
Spark Streaming通過拉模式整合的時候,使用了FlumeUtils這樣一個類,該類是需要依賴一個額外的jar包(spark-streaming-flume_2.10)
要想保證資料不丟失,資料的準確性,可以在構建StreamingConext的時候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)來建立一個StreamingContext,使用StreamingContext.getOrCreate來建立StreamingContext物件,傳入的第一個引數是checkpoint的存放目錄,第二引數是生成StreamingContext物件的使用者自定義函式。如果checkpoint的存放目錄存在,則從這個目錄中生成StreamingContext物件;如果不存在,才會呼叫第二個函式來生成新的StreamingContext物件。在creatingFunc函式中,除了生成一個新的StreamingContext操作,還需要完成各種操作,然後呼叫ssc.checkpoint(checkpointDirectory)來初始化checkpoint功能,最後再返回StreamingContext物件。
這樣,在StreamingContext.getOrCreate之後,就可以直接呼叫start()函式來啟動(或者是從中斷點繼續執行)流式應用了。如果有其他在啟動或繼續執行都要做的工作,可以在start()呼叫前執行。
流失計算中使用checkpoint的作用:
儲存元資料,包括流式應用的配置、流式沒崩潰之前定義的各種操作、未完成所有操作的batch。元資料被儲存到容忍失敗的儲存系統上,如HDFS。這種ckeckpoint主要針對driver失敗後的修復。
儲存流式資料,也是儲存到容忍失敗的儲存系統上,如HDFS。這種ckeckpoint主要針對window operation、有狀態的操作。無論是driver失敗了,還是worker失敗了,這種checkpoint都夠快速恢復,而不需要將很長的歷史資料都重新計算一遍(以便得到當前的狀態)。
設定流式資料checkpoint的週期
對於一個需要做checkpoint的DStream結構,可以通過呼叫DStream.checkpoint(checkpointInterval)來設定ckeckpoint的週期,經驗上一般將這個checkpoint週期設定成batch週期的5至10倍。
使用write ahead logs功能
這是一個可選功能,建議加上。這個功能將使得輸入資料寫入之前配置的checkpoint目錄。這樣有狀態的資料可以從上一個checkpoint開始計算。開啟的方法是把spark.streaming.receiver.writeAheadLogs.enable這個property設定為true。另外,由於輸入RDD的預設StorageLevel是MEMORY_AND_DISK_2,即資料會在兩臺worker上做replication。實際上,Spark Streaming模式下,任何從網路輸入資料的Receiver(如kafka、flume、socket)都會在兩臺機器上做資料備份。如果開啟了write ahead logs的功能,建議把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在建立RDD時由引數傳入。
使用以上的checkpoint機制,確實可以保證資料0丟失。但是一個前提條件是,資料傳送端必須要有快取功能,這樣才能保證在spark應用重啟期間,資料傳送端不會因為spark streaming服務不可用而把資料丟棄。而flume具備這種特性,同樣kafka也具備。
九、hadoop和spark的shuffle相同和差異?
(1)從high-level的角度來看,兩者並沒有大的差別。都是將mapper(Spark中是ShuffleMapTask)的輸出進行partition,不同的partition送到不同的reducer(Spark裡的reducer可能是下一個stage的ShuffleMapTask,也可能是ResultTask)。Reducer以記憶體做緩衝區,邊shuffle邊aggregate資料,等資料aggregate好之後再進行reduce()(Spark裡可能是後續的一系列操作)
(2)從low-level的角度來看,兩者差距不小。Hadoop MapReduce是sort-based,進入combiner()和reduce()的records必須先sort。這樣的好處在於combiner()/reduce()可以處理大規模的資料,因為其輸入資料可以通過外排得到(mapper對每段資料先做排序,reducer的shuffle對排好序的每段資料做歸併)。目前spark選擇的是hash-based,通常使用HashMap對shuffle來的資料進行aggregate,不會對資料進行提前排序。如果使用者需要進行排序的資料,那麼要自己呼叫類似SortByKey()的操作。
(3)從現實角度來看,兩者也有不小差距。Hadoop MapReduce將處理流程劃分出明顯的幾個階段:map(),spill,merge,shuffle,sort,reduce()等。每個階段各司機制,可以按照過程式的程式設計思想來逐一實現每個階段的功能。在Spark中,沒有這樣功能明確的階段,只有不同的stage和一系列的transformation(),所以spill、sort、aggregate等操作需要蘊含在transformation()中。如果我們將map()端劃分資料、持久化資料的過程稱為shuffle write,而將reducer讀入資料、aggregate資料的過程稱為shuffle read。那麼在spark中,問題就變成怎麼在job的邏輯或者物理執行圖中加入shuffle write、shuffle read的處理邏輯,以及兩個處理邏輯怎麼高效實現。Shuffle write由於不要求資料有序,shuffle write的任務很簡單:將資料partition好,並持久化。之所以要持久化,一方面是要減少記憶體儲存空間壓力,另一方面也是為了fault-tolerance。
十、RDD的五大特性
(1) A list of partition
一個RDD有一系列的分割槽/分片
- A function for computing each split/partition
對RDD的每一個分割槽/分片都作用同一個函式
- A list of dependencies on others RDDs
有一些依賴,在其他的RDD上
- Optionally,a Partitioner for key-value RDDs(e.g to say that the RDD is hash-partitioned)
可選的,對於key-value的RDD的分割槽策略。
- Optionally,a list of preferred locations to compute each split on(e.g. block locations for an HDFS file)
可選的,資料在哪兒優先把作業排程到資料所在節點進行計算:移動資料不如移動計算
十一、spark的優勢和劣勢
優勢:
1.速度快
2.其次,Spark是一個靈活的運算框架,適合做批次處理、工作流、互動式分析、流量處理等不同型別的應用,因此spark也可以成為一個用途廣泛的運算引擎,並在未來取代MapReduce的地位
3.最後,Spark可以與Hadoop生態系統的很多元件互相操作。Spark可以執行在新一代資源管理框架YARN上,它還可以讀取已有並存放在Hadoop上的資料,這是個非常大的優勢
劣勢:
1.穩定性方面
2.不能處理大資料
3.不能支援複雜的SQL統計
十二、spark的shuffle過程
1.Spark的shuffle總體而言就包括兩個基本的過程:Shuffle write和Shuffle read。ShuffleMapTask的整個執行過程就是Shuffle write。將資料根據hash的結果,將各個Reduce分割槽的資料寫到各自的磁碟中,寫資料時不做排序操作。
2.首先是將map的輸出結果送到對應的緩衝區bucket中,每個bucket裡的檔案都會被寫入本地磁碟檔案ShuffleBlockFile中,形成一個FileSegment檔案。
3.Shuffle Read指的是reducer對屬於自己的FileSegment檔案進行fetch操作,這裡採用的netty框架,fetch操作會等到所有的Shuffle write過程結束後再進行,.reducer通過fetch得到的FileSegment先放在緩衝區softBuffer中,預設大小45MB。
十三、spark sql為什麼比hive快?
1.消除了冗餘的HDFS讀寫
2.消除了冗餘的MapReduce階段
3.JVM的優化
十四、Spark工作的一個流程
1.構造Spark Application的執行環境(啟動SparkContext),SparkContext向資源管理器(可以是standalone、Mesos或Yarn)註冊並申請執行Executor資源;
2.資源管理器分配Executor資源,Executor執行情況將隨著心跳傳送到資源管理器上;
3.SparkContext構建DAG圖,將DAG圖分解成Stage,並將Taskset傳送給TaskSchedular。Executor向SparkContext申請Task,TaskSchedular將Task傳送給Executor運行同時SparkContext將應用程式程式碼傳送給Executor。
4.Task在Executor上執行,執行完畢釋放所有資源。
十五、對Spark streaming進行效能優化?
1.降低批次處理時間:
①資料接收並行度。
(1)增加DStream:接收網路資料(如Kafka,flume,Socket等)時會對資料進行反序列化再儲存在Spark,由於一個DStream只有Receiver物件,如果成為瓶頸可考慮增加DStream。
(2)設定”spark.streaming.blockInterval”引數:接受的資料被儲存在Spark記憶體前,會被合併成block,而block數量決定了task數量;舉例,當批次時間間隔為2秒且block時間間隔為200毫秒時,Task數量約為10;如果Task數量過低,則浪費了cpu資源;推薦的最小block時間間隔為50ms。
(3)顯式對Input DStream重新分割槽:再進行更深層次處理前,先對輸入資料進行重新分割槽。
②資料處理並行度:reduceByKey,reduceByKeyAndWindow等operation可通過設定”spark.default.parallelism”引數或顯式設定並行度方法引數控制。
③資料序列化:可配置更高效的kryo序列化。
2.設定合理批次時間間隔:
①原則:處理資料的速度應大於或等於資料輸入的速度,即批次處理時間大於或等於批次時間間隔。
②方法:
(1)先設定批次時間間隔為5~10秒資料輸入速度;
(2)再通過檢視log4j日誌中的”Total delay”,逐步調整批次時間間隔,保證”Total delay”小於批次時間間隔。
3.記憶體調優:
①持久化級別:開啟壓縮,設定引數”spark.rdd.compress”;
②GC策略:在Driver和Executor上開啟CMS(Content Management System 內容管理系統)
十六、Spark on Yarn VS standalone
Yarn:你只需要一個節點,然後提交作業即可。這個是不需要spark叢集的(不需要啟動master和worker)
Standalone:你的spark叢集上每個節點上都要部署spark,然後需要啟動spark叢集。
十七、Spark on Yarn的兩種模式
Spark on Yarn支援client和cluster模式:driver執行在哪裡
Client:driver執行在本地,提交作業的程序是不能停止的,否則作業就掛了。
Cluster:提交完作業,那麼提交作業端就可以斷開了,因為driver執行在am(application master)端。
十八、Spark和Hadoop重要概念區分
十九、spark優化之記憶體管理。
Spark中的記憶體管理主要分為兩個方面:執行和儲存。
執行端的記憶體主要是涉及到shuffle,join,sorts和aggregatations時的計算,儲存端的記憶體主要涉及到cache。在spark中,執行和儲存都是共享一個統一的region。當執行端沒有使用記憶體時,儲存端就能獲得所有的記憶體資訊,反之一樣。在必要的時候,執行可以剔除儲存,但是儲存的時候可以設定一個閾值。
還可以看一個RDD消耗多少記憶體,在webUI或者使用SizeEstimator’s estimate方法。
記憶體使用的百分比是(堆記憶體-300MB)*0.6,執行和儲存各佔50%
二十、spark優化之廣播變數。
使用廣播變數在sparkContext中,可以大幅降低每一個序列化task這個物件的大小,叢集中啟動一個job的成本也會降低。如果你的task中使用了一個大物件(large object),考慮把他優化成一個廣播變數。通常來說,一個task大於20KB就值得優化。
二十一、spark優化之資料本地性。
資料本地性是有很大的影響在Spark job的程式中。如果資料和程式碼在一起,計算速度就會非常快。但是如果資料和程式碼是分開的,一個必須要移動到另外一個上去。通常情況下是把序列化後的程式碼移動到資料所在的節點上,因為程式碼的大小比資料小很多(移動計算,而不是移動資料)。Spark構建的排程就是基於資料本地性。
資料本地性指的是資料和程式碼有多近(close)。由近及遠有下面locality level:
1.PROCESS_LOCAL:資料在一個相同的正在執行的程式碼的JVM中。
2.NODE_LOCAL:資料在同一個節點。
3.NO_PREF:資料不管在哪裡都可以快速的訪問到。(無本地性)
4.RACK_LOCAL:資料在相同的機架上。但是資料在同一個機架的不同server上,需要通過網路傳輸。
5.ANY:資料在網路的其他地方,不在一個機架上。
Spark會優先安排作業在最佳的locality level上,但是不太可能。
二十二、Spark on Yarn 模式有哪些優點?
1)與其他計算框架共享叢集資源(eg.Spark框架與MapReduce框架同時執行,如果不用Yarn進行資源分配,MapReduce分到的記憶體資源會很少,效率低下);資源按需分配,進而提高叢集資源利用等。
2)相較於Spark自帶的Standalone模式,Yarn的資源分配更加細緻
3)Application部署簡化,例如Spark,Storm等多種框架的應用由客戶端提交後,由Yarn負責資源的管理和排程,利用Container作為資源隔離的單位,以它為單位去使用記憶體,cpu等。
4)Yarn通過佇列的方式,管理同時執行在Yarn叢集中的多個服務,可根據不同型別的應用程式負載情況,調整對應的資源使用量,實現資源彈性管理。
二十三、spark中task有幾種型別?
2種類型:1)result task型別,最後一個task,2)是shuffleMapTask型別,除了最後一個task都是。
二十四、spark中map和mapPartition的區別?
rdd的mapPartitions是map的一個變種,它們都可進行分割槽的並行處理。
兩者的主要區別是呼叫的粒度不一樣:map的輸入變換函式是應用於RDD中每個元素,而mapPartitions的輸入函式是應用於每個分割槽。
假設一個rdd有10個元素,分成3個分割槽。如果使用map方法,map中的輸入函式會被呼叫10次;而使用mapPartitions方法的話,其輸入函式會只會被呼叫3次,每個分割槽呼叫1次。
這兩個方法的另一個區別是在大資料集情況下的資源初始化開銷和批處理處理,如果在map和mapPartition中都要初始化一個耗時的資源,然後使用,比如資料庫連線。在上面的例子中,mapPartition只需初始化3個資源(3個分割槽每個1次),而map要初始化10次(10個元素每個1次),顯然在大資料集情況下(資料集中元素個數遠大於分割槽數),mapPartitons的開銷要小很多,也便於進行批處理操作。
mapPartitionsWithIndex和mapPartitons類似,只是其引數多了個分割槽索引號。
二十四、python開發spark如何在提交作業的時候新增python的第三方模組?
可以使用--py--files引數,但是應放在執行指令碼的前面。所有的import操作必須在context完成之後。
二十五、什麼是Spark Executor?
當SparkContext連線到叢集管理器時,它會在叢集中的節點上獲取Executor。 executor是Spark程序,它執行計算並將資料儲存在工作節點上。 SparkContext的最終任務被轉移到executors以執行它們。