面試系列五 之 專案涉及技術Spark
一、Spark
1.1 Spark有幾種部署方式?請分別簡要論述
-
1)Local:執行在一臺機器上,通常是練手或者測試環境。
-
2)Standalone:構建一個基於Mster+Slaves的資源排程叢集,Spark任務提交給Master執行。是Spark自身的一個排程系統。
-
3)Yarn: Spark客戶端直接連線Yarn,不需要額外構建Spark叢集。有yarn-client和yarn-cluster兩種模式,主要區別在於:Driver程式的執行節點。
-
4)Mesos:國內大環境比較少用。
1.2 Spark任務使用什麼進行提交,javaEE介面還是指令碼
Shell 指令碼。
1.3 Spark提交作業引數(重點)
1)在提交任務時的幾個重要引數
-
executor-cores —— 每個executor使用的核心數,預設為1,官方建議2-5個,我們企業是4個
-
num-executors —— 啟動executors的數量,預設為2
-
executor-memory —— executor記憶體大小,預設1G
-
driver-cores —— driver使用核心數,預設為1
-
driver-memory —— driver記憶體大小,預設512M
2)邊給一個提交任務的樣式
spark-submit \ --master local\[5\] \ --driver-cores 2 \ --driver-memory 8g \ --executor-cores 4 \ --num-executors 10 \ --executor-memory 8g \ --class PackageName.ClassName XXXX.jar \ --name "Spark Job Name" \ InputPath \ OutputPath
1.4 簡述Spark的架構與作業提交流程(畫圖講解,註明各個部分的作用)(重點)
參考: https://blog.csdn.net/wuxintdrh/article/details/70956686
1.4.1、standlone
1.4.2、yarn-cluster
1.5 如何理解Spark中的血統概念(RDD)(筆試重點)
參考:https://blog.csdn.net/wuxintdrh/article/details/70840323
RDD
在Lineage
依賴方面分為兩種Narrow Dependencies
與Wide Dependencies
用來解決資料容錯時的高效性以及劃分任務時候起到重要作用。
1.6 簡述Spark的寬窄依賴,以及Spark如何劃分stage,每個stage又根據什麼決定task個數? (筆試重點)
Stage:根據RDD之間的依賴關係的不同將Job劃分成不同的Stage,遇到一個寬依賴則劃分一個Stage。
Task:Stage是一個TaskSet,將Stage根據分割槽數劃分成一個個的Task。
1.7 請列舉Spark的transformation運算元(不少於8個),並簡述功能(重點)
參考: https://blog.csdn.net/wuxintdrh/article/details/80815731
1)map(func):返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成.
2)mapPartitions(func):類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RD上執行時,func的函式型別必須是Iterator[T] => Iterator[U]。假設有N個元素,有M個分割槽,那麼map的函式的將被呼叫N次,而mapPartitions被呼叫M次,一個函式一次處理所有分割槽。
3)reduceByKey(func,[numTask]):在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用定的reduce函式,將相同key的值聚合到一起,reduce任務的個數可以通過第二個可選的引數來設定。
4)aggregateByKey (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U: 在kv對的RDD中,,按key將value進行分組合並,合併時,將每個value和初始值作為seq函式的引數,進行計算,返回的結果作為一個新的kv對,然後再將結果按照key進行合併,最後將每個分組的value傳遞給combine函式進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函式,以此類推),將key與計算結果作為一個新的kv對輸出。
5)combineByKey(createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C, C) =>C):
對相同K,把V合併成一個集合。
1.createCombiner: combineByKey() 會遍歷分割槽中的所有元素,因此每個元素的鍵要麼還沒有遇到過,要麼就和之前的某個元素的鍵相同。如果這是一個新的元素,combineByKey()會使用一個叫作createCombiner()的函式來建立那個鍵對應的累加器的初始值
2.mergeValue: 如果這是一個在處理當前分割槽之前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併
3.mergeCombiners: 由於每個分割槽都是獨立處理的, 因此對於同一個鍵可以有多個累加器。如果有兩個或者更多的分割槽都有對應同一個鍵的累加器, 就需要使用使用者提供的 mergeCombiners() 方法將各個分割槽的結果進行合併。
…
根據自身情況選擇比較熟悉的運算元加以介紹。
1.8 請列舉Spark的action運算元(不少於6個),並簡述功能(重點)
參考: https://blog.csdn.net/wuxintdrh/article/details/80815731
1)reduce:
2)collect:
3)first:
4)take:
5)aggregate:
6)countByKey:
7)foreach:
8)saveAsTextFile:
1.9 請列舉會引起Shuffle過程的Spark運算元,並簡述功能。
reduceBykey:
groupByKey:
…
ByKey:
1.10 簡述Spark的兩種核心Shuffle(HashShuffle與SortShuffle)的工作流程(包括未優化的HashShuffle、優化的HashShuffle、普通的SortShuffle與bypass的SortShuffle)(重點)
未經優化的HashShuffle:
優化後的Shuffle:
普通的SortShuffle:
當 shuffle read task
的 數 量 小 於 等 於 spark.shuffle.sort
。
bypassMergeThreshold
引數的值時(預設為 200),就會啟用 bypass 機制。
1.11 Spark常用運算元reduceByKey與groupByKey的區別,哪一種更具優勢?(重點)
reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,返回結果是RDD[k,v]。
groupByKey:按照key進行分組,直接進行shuffle。
開發指導:reduceByKey比groupByKey,建議使用。但是需要注意是否會影響業務邏輯。
1.12 Repartition和Coalesce關係與區別
1)關係:
兩者都是用來改變RDD的partition數量的,repartition底層呼叫的就是coalesce方法:coalesce(numPartitions, shuffle = true)
2)區別:
repartition一定會發生shuffle,coalesce根據傳入的引數來判斷是否發生shuffle
一般情況下
- 增大rdd的partition數量使用repartition
- 減少partition數量時使用coalesce
1.13 分別簡述Spark中的快取機制(cache和persist)與checkpoint機制,並指出兩者的區別與聯絡
都是做RDD持久化的
cache
:記憶體,不會截斷血緣關係,使用計算過程中的資料快取。
checkpoint
:磁碟,截斷血緣關係,在ck之前必須沒有任何任務提交才會生效,ck過程會額外提交一次任務。
1.14 簡述Spark中共享變數(廣播變數和累加器)的基本原理與用途。(重點)
累加器(accumulator)是Spark中提供的一種分散式的變數機制,其原理類似於mapreduce,即分散式的改變,然後聚合這些改變。累加器的一個常見用途是在除錯時對作業執行過程中的事件進行計數。而廣播變數用來高效分發較大的物件。
共享變量出現的原因:
通常在向 Spark 傳遞函式時,比如使用 map() 函式或者用 filter() 傳條件時,可以使用驅動器程式中定義的變數,但是叢集中執行的每個任務都會得到這些變數的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變數。
Spark的兩個共享變數,累加器與廣播變數,分別為結果聚合與廣播這兩種常見的通訊模式突破了這一限制。
1.15 當Spark涉及到資料庫的操作時,如何減少Spark執行中的資料庫連線數?
使用foreachPartition代替foreach,在foreachPartition內獲取資料庫的連線。
1.16 簡述SparkSQL中RDD、DataFrame、DataSet三者的區別與聯絡? (筆試重點)
1)RDD
優點:
-
編譯時型別安全
-
編譯時就能檢查出型別錯誤
-
面向物件的程式設計風格
-
直接通過類名點的方式來操作資料
缺點:
-
序列化和反序列化的效能開銷
-
無論是叢集間的通訊, 還是IO操作都需要對物件的結構和資料進行序列化和反序列化。
-
GC的效能開銷,頻繁的建立和銷燬物件, 勢必會增加GC
2)DataFrame
DataFrame引入了schema
和off-heap
schema : RDD每一行的資料, 結構都是一樣的,這個結構就儲存在schema中。 Spark通過schema就能夠讀懂資料, 因此在通訊和IO時就只需要序列化和反序列化資料, 而結構的部分就可以省略了。
3)DataSet
DataSet結合了RDD和DataFrame的優點,並帶來的一個新的概念Encoder
。
當序列化資料時,Encoder產生位元組碼與off-heap進行互動,能夠達到按需訪問資料的效果,而不用反序列化整個物件。Spark還沒有提供自定義Encoder的API,但是未來會加入。
三者之間的轉換:
1.17 SparkSQL中join操作與left join操作的區別?
join和sql中的inner join操作很相似,返回結果是前面一個集合和後面一個集合中匹配成功的,過濾掉關聯不上的。
leftJoin類似於SQL中的左外關聯left outer join,返回結果以第一個RDD為主,關聯不上的記錄為空。
部分場景下可以使用left semi join替代left join:
因為 left semi join 是 in(keySet) 的關係,遇到右表重複記錄,左表會跳過,效能更高,而 left join 則會一直遍歷。但是left semi join 中最後 select 的結果中只許出現左表中的列名,因為右表只有 join key 參與關聯計算了
1.18 請手寫出wordcount的Spark程式碼實現(Scala)(手寫程式碼重點)
val conf: SparkConf = new SparkConf().setMaster("local\[*\]").setAppName("WordCount")
val sc = new SparkContext(conf)
sc.textFile("/input")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("/output")
sc.stop()
1.19、 如何使用Spark實現topN的獲取(描述思路或使用虛擬碼)(重點)
方法1:
-
(1)按照key對資料進行聚合(groupByKey)
-
(2)將value轉換為陣列,利用scala的sortBy或者sortWith進行排序(mapValues)資料量太大,會OOM。
方法2:
-
(1)取出所有的key
-
(2)對key進行迭代,每次取出一個key利用spark的排序運算元進行排序
方法3:
-
(1)自定義分割槽器,按照key進行分割槽,使不同的key進到不同的分割槽
-
(2)對每個分割槽運用spark的排序運算元進行排序
1.20 京東:調優之前與調優之後效能的詳細對比(例如調整map個數,map個數之前多少、之後多少,有什麼提升)
這裡舉個例子。比如我們有幾百個檔案,會有幾百個map出現,讀取之後進行join操作,會非常的慢。這個時候我們可以進行coalesce操作,比如240個map,我們合成60個map,也就是窄依賴。這樣再shuffle,過程產生的檔案數會大大減少。提高join的時間效能。
二、SparkStreaming
參考: https://chbxw.blog.csdn.net/article/details/80809898
2.1、 SparkStreaming有哪幾種方式消費Kafka中的資料,它們之間的區別是什麼?
1、基於Receiver的方式
這種方式使用Receiver來獲取資料。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的資料都是儲存在Spark Executor的記憶體中的(如果突然資料暴增,大量batch堆積,很容易出現記憶體溢位的問題),然後Spark Streaming啟動的job會去處理那些資料。
然而,在預設的配置下,這種方式可能會因為底層的失敗而丟失資料。如果要啟用高可靠機制,讓資料零丟失,就必須啟用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka資料寫入分散式檔案系統(比如HDFS)上的預寫日誌中。所以,即使底層節點出現了失敗,也可以使用預寫日誌中的資料進行恢復。
2、基於Direct的方式
這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收資料後,這種方式會週期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的範圍。當處理資料的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的資料。
優點如下:
簡化並行讀取:如果要讀取多個partition,不需要建立多個輸入DStream然後對它們進行union操作。Spark會建立跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取資料。所以在Kafka partition和RDD partition之間,有一個一對一的對映關係。
高效能:如果要保證零資料丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為資料實際上被複制了兩份,Kafka自己本身就有高可靠的機制,會對資料複製一份,而這裡又會複製一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了資料的複製,那麼就可以通過Kafka的副本進行恢復。
一次且僅一次的事務機制。
3、對比:
基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中儲存消費過的offset的。這是消費Kafka資料的傳統方式。這種方式配合著WAL機制可以保證資料零丟失的高可靠性,但是卻無法保證資料被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並儲存在checkpoint中。Spark自己一定是同步的,因此可以保證資料是消費一次且僅消費一次。
在實際生產環境中大都用Direct方式
2.2 簡述SparkStreaming視窗函式的原理(重點)
視窗函式就是在原來定義的SparkStreaming計算批次大小的基礎上再次進行封裝,每次計算多個批次的資料,同時還需要傳遞一個滑動步長的引數,用來設定當次計算任務完成之後下一次從什麼地方開始計算。
圖中time1就是SparkStreaming計算批次大小,虛線框以及實線大框就是視窗的大小,必須為批次的整數倍。虛線框到大實線框的距離(相隔多少批次),就是滑動步長。