1. 程式人生 > 其它 >【轉】大資料開發之 Spark 面試八股文

【轉】大資料開發之 Spark 面試八股文

【轉】大資料開發之 Spark 面試八股文

 

 

1. Spark 的執行流程?

 

 

具體執行流程如下:

  1. SparkContext 向資源管理器註冊並向資源管理器申請執行 Executor

  2. 資源管理器分配 Executor,然後資源管理器啟動 Executor

  3. Executor 傳送心跳至資源管理器

  4. SparkContext 構建 DAG 有向無環圖

  5. 將 DAG 分解成 Stage(TaskSet)

  6. 把 Stage 傳送給 TaskScheduler

  7. Executor 向 SparkContext 申請 Task

  8. TaskScheduler 將 Task 傳送給 Executor 執行

  9. 同時 SparkContext 將應用程式程式碼發放給 Executor

  10. Task 在 Executor 上執行,執行完畢釋放所有資源大資料培訓

2. Spark 有哪些元件?

  1. master:管理叢集和節點,不參與計算。

  2. worker:計算節點,程序本身不參與計算,和 master 彙報。

  3. Driver:執行程式的 main 方法,建立 spark context 物件。

  4. spark context:控制整個 application 的生命週期,包括 dagsheduler 和 task scheduler 等元件。

  5. client:使用者提交程式的入口。

3. Spark 中的 RDD 機制理解嗎?

rdd 分散式彈性資料集,簡單的理解成一種資料結構,是 spark 框架上的通用貨幣。所有運算元都是基於 rdd 來執行的,不同的場景會有不同的 rdd 實現類,但是都可以進行互相轉換。rdd 執行過程中會形成 dag 圖,然後形成 lineage 保證容錯性等。從物理的角度來看 rdd 儲存的是 block 和 node 之間的對映。

RDD 是 spark 提供的核心抽象,全稱為彈性分散式資料集。

RDD 在邏輯上是一個 hdfs 檔案,在抽象上是一種元素集合,包含了資料。它是被分割槽的,分為多個分割槽,每個分割槽分佈在叢集中的不同結點上,從而讓 RDD 中的資料可以被並行操作(分散式資料集)

比如有個 RDD 有 90W 資料,3 個 partition,則每個分割槽上有 30W 資料。RDD 通常通過 Hadoop 上的檔案,即 HDFS 或者 HIVE 表來建立,還可以通過應用程式中的集合來建立;RDD 最重要的特性就是容錯性,可以自動從節點失敗中恢復過來。即如果某個結點上的 RDD partition 因為節點故障,導致資料丟失,那麼 RDD 可以通過自己的資料來源重新計算該 partition。這一切對使用者都是透明的。

RDD 的資料預設存放在記憶體中,但是當記憶體資源不足時,spark 會自動將 RDD 資料寫入磁碟。比如某結點記憶體只能處理 20W 資料,那麼這 20W 資料就會放入記憶體中計算,剩下 10W 放到磁碟中。RDD 的彈性體現在於 RDD 上自動進行記憶體和磁碟之間權衡和切換的機制。

4. RDD 中 reduceBykey 與 groupByKey 哪個效能好,為什麼?

reduceByKey:reduceByKey 會在結果傳送至 reducer 之前會對每個 mapper 在本地進行 merge,有點類似於在 MapReduce 中的 combiner。這樣做的好處在於,在 map 端進行一次 reduce 之後,資料量會大幅度減小,從而減小傳輸,保證 reduce 端能夠更快的進行結果計算。

groupByKey:groupByKey 會對每一個 RDD 中的 value 值進行聚合形成一個序列(Iterator),此操作發生在 reduce 端,所以勢必會將所有的資料通過網路進行傳輸,造成不必要的浪費。同時如果資料量十分大,可能還會造成 OutOfMemoryError。

所以在進行大量資料的 reduce 操作時候建議使用 reduceByKey。不僅可以提高速度,還可以防止使用 groupByKey 造成的記憶體溢位問題。

5. 介紹一下 cogroup rdd 實現原理,你在什麼場景下用過這個 rdd?

cogroup:對多個(2~4)RDD 中的 KV 元素,每個 RDD 中相同 key 中的元素分別聚合成一個集合。

與 reduceByKey 不同的是:reduceByKey 針對一個 RDD 中相同的 key 進行合併。而 cogroup 針對多個 RDD 中相同的 key 的元素進行合併。

cogroup 的函式實現:這個實現根據要進行合併的兩個 RDD 操作,生成一個 CoGroupedRDD 的例項,這個 RDD 的返回結果是把相同的 key 中兩個 RDD 分別進行合併操作,最後返回的 RDD 的 value 是一個 Pair 的例項,這個例項包含兩個 Iterable 的值,第一個值表示的是 RDD1 中相同 KEY 的值,第二個值表示的是 RDD2 中相同 key 的值。

由於做 cogroup 的操作,需要通過 partitioner 進行重新分割槽的操作,因此,執行這個流程時,需要執行一次 shuffle 的操作(如果要進行合併的兩個 RDD 的都已經是 shuffle 後的 rdd,同時他們對應的 partitioner 相同時,就不需要執行 shuffle)。

場景:表關聯查詢或者處理重複的 key。

6. 如何區分 RDD 的寬窄依賴?

窄依賴:父 RDD 的一個分割槽只會被子 RDD 的一個分割槽依賴;

寬依賴:父 RDD 的一個分割槽會被子 RDD 的多個分割槽依賴(涉及到 shuffle)。

7. 為什麼要設計寬窄依賴?

  1. 對於窄依賴:

     

    窄依賴的多個分割槽可以平行計算;

     

    窄依賴的一個分割槽的資料如果丟失只需要重新計算對應的分割槽的資料就可以了。

  2. 對於寬依賴:

     

    劃分 Stage(階段)的依據:對於寬依賴,必須等到上一階段計算完成才能計算下一階段。

8. DAG 是什麼?

DAG(Directed Acyclic Graph 有向無環圖)指的是資料轉換執行的過程,有方向,無閉環(其實就是 RDD 執行的流程);原始的 RDD 通過一系列的轉換操作就形成了 DAG 有向無環圖,任務執行時,可以按照 DAG 的描述,執行真正的計算(資料被操作的一個過程)。

9. DAG 中為什麼要劃分 Stage?

平行計算。

一個複雜的業務邏輯如果有 shuffle,那麼就意味著前面階段產生結果後,才能執行下一個階段,即下一個階段的計算要依賴上一個階段的資料。那麼我們按照 shuffle 進行劃分(也就是按照寬依賴就行劃分),就可以將一個 DAG 劃分成多個 Stage/階段,在同一個 Stage 中,會有多個運算元操作,可以形成一個 pipeline 流水線,流水線內的多個平行的分割槽可以並行執行。

10. 如何劃分 DAG 的 stage?

對於窄依賴,partition 的轉換處理在 stage 中完成計算,不劃分(將窄依賴儘量放在在同一個 stage 中,可以實現流水線計算)。

對於寬依賴,由於有 shuffle 的存在,只能在父 RDD 處理完成後,才能開始接下來的計算,也就是說需要要劃分 stage。

11. DAG 劃分為 Stage 的演算法瞭解嗎?

核心演算法:回溯演算法

從後往前回溯/反向解析,遇到窄依賴加入本 Stage,遇見寬依賴進行 Stage 切分。

Spark 核心會從觸發 Action 操作的那個 RDD 開始從後往前推,首先會為最後一個 RDD 建立一個 Stage,然後繼續倒推,如果發現對某個 RDD 是寬依賴,那麼就會將寬依賴的那個 RDD 建立一個新的 Stage,那個 RDD 就是新的 Stage 的最後一個 RDD。然後依次類推,繼續倒推,根據窄依賴或者寬依賴進行 Stage 的劃分,直到所有的 RDD 全部遍歷完成為止。

12. 對於 Spark 中的資料傾斜問題你有什麼好的方案?

  1. 前提是定位資料傾斜,是 OOM 了,還是任務執行緩慢,看日誌,看 WebUI

  2. 解決方法,有多個方面:

  • 避免不必要的 shuffle,如使用廣播小表的方式,將 reduce-side-join 提升為 map-side-join

  • 分拆發生資料傾斜的記錄,分成幾個部分進行,然後合併 join 後的結果

  • 改變並行度,可能並行度太少了,導致個別 task 資料壓力大

  • 兩階段聚合,先區域性聚合,再全域性聚合

  • 自定義 paritioner,分散 key 的分佈,使其更加均勻

13. Spark 中的 OOM 問題?

  1. map 型別的運算元執行中記憶體溢位如 flatMap,mapPatitions

  • 原因:map 端過程產生大量物件導致記憶體溢位:這種溢位的原因是在單個 map 中產生了大量的物件導致的針對這種問題。

  1. 解決方案:

  • 增加堆內記憶體。

  • 在不增加記憶體的情況下,可以減少每個 Task 處理資料量,使每個 Task 產生大量的物件時,Executor 的記憶體也能夠裝得下。具體做法可以在會產生大量物件的 map 操作之前呼叫 repartition 方法,分割槽成更小的塊傳入 map。

  1. shuffle 後記憶體溢位如 join,reduceByKey,repartition。

  • shuffle 記憶體溢位的情況可以說都是 shuffle 後,單個檔案過大導致的。在 shuffle 的使用,需要傳入一個 partitioner,大部分 Spark 中的 shuffle 操作,預設的 partitioner 都是 HashPatitioner,預設值是父 RDD 中最大的分割槽數.這個引數 spark.default.parallelism 只對 HashPartitioner 有效.如果是別的 partitioner 導致的 shuffle 記憶體溢位就需要重寫 partitioner 程式碼了.

  1. driver 記憶體溢位

  • 使用者在 Dirver 埠生成大物件,比如建立了一個大的集合資料結構。解決方案:將大物件轉換成 Executor 端載入,比如呼叫 sc.textfile 或者評估大物件佔用的記憶體,增加 dirver 端的記憶體

  • 從 Executor 端收集資料(collect)回 Dirver 端,建議將 driver 端對 collect 回來的資料所作的操作,轉換成 executor 端 rdd 操作。

14. Spark 中資料的位置是被誰管理的?

每個資料分片都對應具體物理位置,資料的位置是被 blockManager 管理,無論資料是在磁碟,記憶體還是 tacyan,都是由 blockManager 管理。

15. Spaek 程式執行,有時候預設為什麼會產生很多 task,怎麼修改預設 task 執行個數?

  1. 輸入資料有很多 task,尤其是有很多小檔案的時候,有多少個輸入 block 就會有多少個 task 啟動;

  2. spark 中有 partition 的概念,每個 partition 都會對應一個 task,task 越多,在處理大規模資料的時候,就會越有效率。不過 task 並不是越多越好,如果平時測試,或者資料量沒有那麼大,則沒有必要 task 數量太多。

  3. 引數可以通過 spark_home/conf/spark-default.conf 配置檔案設定:

針對 spark sql 的 task 數量:spark.sql.shuffle.partitions=50

非 spark sql 程式設定生效:spark.default.parallelism=10

16. 介紹一下 join 操作優化經驗?

這道題常考,這裡只是給大家一個思路,簡單說下!面試之前還需做更多準備。

join 其實常見的就分為兩類:map-side join 和 reduce-side join。

當大表和小表 join 時,用 map-side join 能顯著提高效率。

將多份資料進行關聯是資料處理過程中非常普遍的用法,不過在分散式計算系統中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有資料根據 key 傳送到所有的 reduce 分割槽中去,也就是 shuffle 的過程。造成大量的網路以及磁碟 IO 消耗,執行效率極其低下,這個過程一般被稱為 reduce-side-join。

如果其中有張表較小的話,我們則可以自己實現在 map 端實現資料關聯,跳過大量資料進行 shuffle 的過程,執行時間得到大量縮短,根據不同資料可能會有幾倍到數十倍的效能提升。

在大資料量的情況下,join 是一中非常昂貴的操作,需要在 join 之前應儘可能的先縮小資料量。

對於縮小資料量,有以下幾條建議:

  1. 若兩個 RDD 都有重複的 key,join 操作會使得資料量會急劇的擴大。所有,最好先使用 distinct 或者 combineByKey 操作來減少 key 空間或者用 cogroup 來處理重複的 key,而不是產生所有的交叉結果。在 combine 時,進行機智的分割槽,可以避免第二次 shuffle。

  2. 如果只在一個 RDD 出現,那你將在無意中丟失你的資料。所以使用外連線會更加安全,這樣你就能確保左邊的 RDD 或者右邊的 RDD 的資料完整性,在 join 之後再過濾資料。

  3. 如果我們容易得到 RDD 的可以的有用的子集合,那麼我們可以先用 filter 或者 reduce,如何在再用 join。

17. Spark 與 MapReduce 的 Shuffle 的區別?

  1. 相同點:都是將 mapper(Spark 裡是 ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的 reducer(Spark 裡 reducer 可能是下一個 stage 裡的 ShuffleMapTask,也可能是 ResultTask)

  2. 不同點:

  • MapReduce 預設是排序的,spark 預設不排序,除非使用 sortByKey 運算元。

  • MapReduce 可以劃分成 split,map()、spill、merge、shuffle、sort、reduce()等階段,spark 沒有明顯的階段劃分,只有不同的 stage 和運算元操作。

  • MR 落盤,Spark 不落盤,spark 可以解決 mr 落盤導致效率低下的問題。

18. Spark SQL 執行的流程?

這個問題如果深挖還挺複雜的,這裡簡單介紹下總體流程:

  1. parser:基於 antlr 框架對 sql 解析,生成抽象語法樹。

  2. 變數替換:通過正則表示式找出符合規則的字串,替換成系統快取環境的變數

SQLConf 中的 spark.sql.variable.substitute,預設是可用的;

  1. parser:將 antlr 的 tree 轉成 spark catalyst 的 LogicPlan,也就是 未解析的邏輯計劃;詳細參考 AstBuild, ParseDriver

  2. analyzer:通過分析器,結合 catalog,把 logical plan 和實際的資料繫結起來,將 未解析的邏輯計劃 生成 邏輯計劃;詳細參考 QureyExecution

  3. 快取替換:通過 CacheManager,替換有相同結果的 logical plan(邏輯計劃)

  4. logical plan 優化,基於規則的優化;優化規則參考 Optimizer,優化執行器 RuleExecutor

  5. 生成 spark plan,也就是物理計劃;參考 QueryPlanner 和 SparkStrategies

  6. spark plan 準備階段

  7. 構造 RDD 執行,涉及 spark 的 wholeStageCodegenExec 機制,基於 janino 框架生成 java 程式碼並編譯

19. Spark SQL 是如何將資料寫到 Hive 表的?

  • 方式一:是利用 Spark RDD 的 API 將資料寫入 hdfs 形成 hdfs 檔案,之後再將 hdfs 檔案和 hive 表做載入對映。

  • 方式二:利用 Spark SQL 將獲取的資料 RDD 轉換成 DataFrame,再將 DataFrame 寫成快取表,最後利用 Spark SQL 直接插入 hive 表中。而對於利用 Spark SQL 寫 hive 表官方有兩種常見的 API,第一種是利用 JavaBean 做對映,第二種是利用 StructType 建立 Schema 做對映。

20. 通常來說,Spark 與 MapReduce 相比,Spark 執行效率更高。請說明效率更高來源於 Spark 內建的哪些機制?

  1. 基於記憶體計算,減少低效的磁碟互動;

  2. 高效的排程演算法,基於 DAG;

  3. 容錯機制 Linage。

重點部分就是 DAG 和 Lingae

21. Hadoop 和 Spark 的相同點和不同點?

Hadoop 底層使用 MapReduce 計算架構,只有 map 和 reduce 兩種操作,表達能力比較欠缺,而且在 MR 過程中會重複的讀寫 hdfs,造成大量的磁碟 io 讀寫操作,所以適合高時延環境下批處理計算的應用;

Spark 是基於記憶體的分散式計算架構,提供更加豐富的資料集操作型別,主要分成轉化操作和行動操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,資料分析更加快速,所以適合低時延環境下計算的應用;

spark 與 hadoop 最大的區別在於迭代式計算模型。基於 mapreduce 框架的 Hadoop 主要分為 map 和 reduce 兩個階段,兩個階段完了就結束了,所以在一個 job 裡面能做的處理很有限;spark 計算模型是基於記憶體的迭代式計算模型,可以分為 n 個階段,根據使用者編寫的 RDD 運算元和程式,在處理完一個階段後可以繼續往下處理很多個階段,而不只是兩個階段。所以 spark 相較於 mapreduce,計算模型更加靈活,可以提供更強大的功能。

但是 spark 也有劣勢,由於 spark 基於記憶體進行計算,雖然開發容易,但是真正面對大資料的時候,在沒有進行調優的情況下,可能會出現各種各樣的問題,比如 OOM 記憶體溢位等情況,導致 spark 程式可能無法執行起來,而 mapreduce 雖然執行緩慢,但是至少可以慢慢執行完。

22. Hadoop 和 Spark 使用場景?

Hadoop/MapReduce 和 Spark 最適合的都是做離線型的資料分析,但 Hadoop 特別適合是單次分析的資料量“很大”的情景,而 Spark 則適用於資料量不是很大的情景。

  1. 一般情況下,對於中小網際網路和企業級的大資料應用而言,單次分析的數量都不會“很大”,因此可以優先考慮使用 Spark。

  2. 業務通常認為 Spark 更適用於機器學習之類的“迭代式”應用,80GB 的壓縮資料(解壓後超過 200GB),10 個節點的叢集規模,跑類似“sum+group-by”的應用,MapReduce 花了 5 分鐘,而 spark 只需要 2 分鐘。

23. Spark 如何保證宕機迅速恢復?

  1. 適當增加 spark standby master

  2. 編寫 shell 指令碼,定期檢測 master 狀態,出現宕機後對 master 進行重啟操作

24. RDD 持久化原理?

spark 非常重要的一個功能特性就是可以將 RDD 持久化在記憶體中。

呼叫 cache()和 persist()方法即可。cache()和 persist()的區別在於,cache()是 persist()的一種簡化方式,cache()的底層就是呼叫 persist()的無參版本 persist(MEMORY_ONLY),將資料持久化到記憶體中。

如果需要從記憶體中清除快取,可以使用 unpersist()方法。RDD 持久化是可以手動選擇不同的策略的。在呼叫 persist()時傳入對應的 StorageLevel 即可。

25. Checkpoint 檢查點機制?

應用場景:當 spark 應用程式特別複雜,從初始的 RDD 開始到最後整個應用程式完成有很多的步驟,而且整個應用執行時間特別長,這種情況下就比較適合使用 checkpoint 功能。

原因:對於特別複雜的 Spark 應用,會出現某個反覆使用的 RDD,即使之前持久化過但由於節點的故障導致資料丟失了,沒有容錯機制,所以需要重新計算一次資料。

Checkpoint 首先會呼叫 SparkContext 的 setCheckPointDIR()方法,設定一個容錯的檔案系統的目錄,比如說 HDFS;然後對 RDD 呼叫 checkpoint()方法。之後在 RDD 所處的 job 執行結束之後,會啟動一個單獨的 job,來將 checkpoint 過的 RDD 資料寫入之前設定的檔案系統,進行高可用、容錯的類持久化操作。

檢查點機制是我們在 spark streaming 中用來保障容錯性的主要機制,它可以使 spark streaming 階段性的把應用資料儲存到諸如 HDFS 等可靠儲存系統中,以供恢復時使用。具體來說基於以下兩個目的服務:

  1. 控制發生失敗時需要重算的狀態數。Spark streaming 可以通過轉化圖的譜系圖來重算狀態,檢查點機制則可以控制需要在轉化圖中回溯多遠。

  2. 提供驅動器程式容錯。如果流計算應用中的驅動器程式崩潰了,你可以重啟驅動器程式並讓驅動器程式從檢查點恢復,這樣 spark streaming 就可以讀取之前執行的程式處理資料的進度,並從那裡繼續。

26. Checkpoint 和持久化機制的區別?

最主要的區別在於持久化只是將資料儲存在 BlockManager 中,但是 RDD 的 lineage(血緣關係,依賴關係)是不變的。但是 checkpoint 執行完之後,rdd 已經沒有之前所謂的依賴 rdd 了,而只有一個強行為其設定的 checkpointRDD,checkpoint 之後 rdd 的 lineage 就改變了。

持久化的資料丟失的可能性更大,因為節點的故障會導致磁碟、記憶體的資料丟失。但是 checkpoint 的資料通常是儲存在高可用的檔案系統中,比如 HDFS 中,所以資料丟失可能性比較低

27. Spark Streaming 以及基本工作原理?

Spark streaming 是 spark core API 的一種擴充套件,可以用於進行大規模、高吞吐量、容錯的實時資料流的處理。

它支援從多種資料來源讀取資料,比如 Kafka、Flume、Twitter 和 TCP Socket,並且能夠使用運算元比如 map、reduce、join 和 window 等來處理資料,處理後的資料可以儲存到檔案系統、資料庫等儲存中。

Spark streaming 內部的基本工作原理是:接受實時輸入資料流,然後將資料拆分成 batch,比如每收集一秒的資料封裝成一個 batch,然後將每個 batch 交給 spark 的計算引擎進行處理,最後會生產處一個結果資料流,其中的資料也是一個一個的 batch 組成的。

28. DStream 以及基本工作原理?

DStream 是 spark streaming 提供的一種高階抽象,代表了一個持續不斷的資料流。

DStream 可以通過輸入資料來源來建立,比如 Kafka、flume 等,也可以通過其他 DStream 的高階函式來建立,比如 map、reduce、join 和 window 等。

DStream 內部其實不斷產生 RDD,每個 RDD 包含了一個時間段的資料。

Spark streaming 一定是有一個輸入的 DStream 接收資料,按照時間劃分成一個一個的 batch,並轉化為一個 RDD,RDD 的資料是分散在各個子節點的 partition 中。

29. Spark Streaming 整合 Kafka 的兩種模式?

  1. receiver 方式:將資料拉取到 executor 中做操作,若資料量大,記憶體儲存不下,可以通過 WAL,設定了本地儲存,保證資料不丟失,然後使用 Kafka 高階 API 通過 zk 來維護偏移量,保證消費資料。receiver 消費的資料偏移量是在 zk 獲取的,此方式效率低,容易出現數據丟失。

  • receiver 方式的容錯性:在預設的配置下,這種方式可能會因為底層的失敗而丟失資料。如果要啟用高可靠機制,讓資料零丟失,就必須啟用 Spark Streaming 的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的 Kafka 資料寫入分散式檔案系統(比如 HDFS)上的預寫日誌中。所以,即使底層節點出現了失敗,也可以使用預寫日誌中的資料進行恢復。

  • Kafka 中的 topic 的 partition,與 Spark 中的 RDD 的 partition 是沒有關係的。在 1、KafkaUtils.createStream()中,提高 partition 的數量,只會增加 Receiver 方式中讀取 partition 的執行緒的數量。不會增加 Spark 處理資料的並行度。可以建立多個 Kafka 輸入 DStream,使用不同的 consumer group 和 topic,來通過多個 receiver 並行接收資料。

  1. 基於 Direct 方式:使用 Kafka 底層 Api,其消費者直接連線 kafka 的分割槽上,因為 createDirectStream 建立的 DirectKafkaInputDStream 每個 batch 所對應的 RDD 的分割槽與 kafka 分割槽一一對應,但是需要自己維護偏移量,即用即取,不會給記憶體造成太大的壓力,效率高。

  • 優點:簡化並行讀取:如果要讀取多個 partition,不需要建立多個輸入 DStream 然後對它們進行 union 操作。Spark 會建立跟 Kafka partition 一樣多的 RDD partition,並且會並行從 Kafka 中讀取資料。所以在 Kafka partition 和 RDD partition 之間,有一個一對一的對映關係。

  • 高效能:如果要保證零資料丟失,在基於 receiver 的方式中,需要開啟 WAL 機制。這種方式其實效率低下,因為資料實際上被複制了兩份,Kafka 自己本身就有高可靠的機制,會對資料複製一份,而這裡又會複製一份到 WAL 中。而基於 direct 的方式,不依賴 Receiver,不需要開啟 WAL 機制,只要 Kafka 中作了資料的複製,那麼就可以通過 Kafka 的副本進行恢復。

  1. receiver 與和 direct 的比較:

  • 基於 receiver 的方式,是使用 Kafka 的高階 API 來在 ZooKeeper 中儲存消費過的 offset 的。這是消費 Kafka 資料的傳統方式。這種方式配合著 WAL 機制可以保證資料零丟失的高可靠性,但是卻無法保證資料被處理一次且僅一次,可能會處理兩次。因為 Spark 和 ZooKeeper 之間可能是不同步的。

  • 基於 direct 的方式,使用 Kafka 的低階 API,Spark Streaming 自己就負責追蹤消費的 offset,並儲存在 checkpoint 中。Spark 自己一定是同步的,因此可以保證資料是消費一次且僅消費一次。

  • Receiver 方式是通過 zookeeper 來連線 kafka 佇列,Direct 方式是直接連線到 kafka 的節點上獲取資料。

30. Spark 主備切換機制原理知道嗎?

Master 實際上可以配置兩個,Spark 原生的 standalone 模式是支援 Master 主備切換的。當 Active Master 節點掛掉以後,我們可以將 Standby Master 切換為 Active Master。

Spark Master 主備切換可以基於兩種機制,一種是基於檔案系統的,一種是基於 ZooKeeper 的。

基於檔案系統的主備切換機制,需要在 Active Master 掛掉之後手動切換到 Standby Master 上;

而基於 Zookeeper 的主備切換機制,可以實現自動切換 Master。

31. Spark 解決了 Hadoop 的哪些問題?

  1. MR:抽象層次低,需要使用手工程式碼來完成程式編寫,使用上難以上手;Spark:Spark 採用 RDD 計算模型,簡單容易上手。

  2. MR:只提供 map 和 reduce 兩個操作,表達能力欠缺;Spark:Spark 採用更加豐富的運算元模型,包括 map、flatmap、groupbykey、reducebykey 等;

  3. MR:一個 job 只能包含 map 和 reduce 兩個階段,複雜的任務需要包含很多個 job,這些 job 之間的管理以來需要開發者自己進行管理;Spark:Spark 中一個 job 可以包含多個轉換操作,在排程時可以生成多個 stage,而且如果多個 map 操作的分割槽不變,是可以放在同一個 task 裡面去執行;

  4. MR:中間結果存放在 hdfs 中;Spark:Spark 的中間結果一般存在記憶體中,只有當記憶體不夠了,才會存入本地磁碟,而不是 hdfs;

  5. MR:只有等到所有的 map task 執行完畢後才能執行 reduce task;Spark:Spark 中分割槽相同的轉換構成流水線在一個 task 中執行,分割槽不同的需要進行 shuffle 操作,被劃分成不同的 stage 需要等待前面的 stage 執行完才能執行。

  6. MR:只適合 batch 批處理,時延高,對於互動式處理和實時處理支援不夠;Spark:Spark streaming 可以將流拆成時間間隔的 batch 進行處理,實時計算。

32. 資料傾斜的產生和解決辦法?

資料傾斜以為著某一個或者某幾個 partition 的資料特別大,導致這幾個 partition 上的計算需要耗費相當長的時間。

在 spark 中同一個應用程式劃分成多個 stage,這些 stage 之間是序列執行的,而一個 stage 裡面的多個 task 是可以並行執行,task 數目由 partition 數目決定,如果一個 partition 的數目特別大,那麼導致這個 task 執行時間很長,導致接下來的 stage 無法執行,從而導致整個 job 執行變慢。

避免資料傾斜,一般是要選用合適的 key,或者自己定義相關的 partitioner,通過加鹽或者雜湊值來拆分這些 key,從而將這些資料分散到不同的 partition 去執行。

如下運算元會導致 shuffle 操作,是導致資料傾斜可能發生的關鍵點所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;

33. 你用 Spark Sql 處理的時候, 處理過程中用的 DataFrame 還是直接寫的 Sql?為什麼?

這個問題的宗旨是問你 spark sql 中 dataframe 和 sql 的區別,從執行原理、操作方便程度和自定義程度來分析 這個問題。

34. Spark Master HA 主從切換過程不會影響到叢集已有作業的執行,為什麼?

不會的。

因為程式在執行之前,已經申請過資源了,driver 和 Executors 通訊,不需要和 master 進行通訊的。

35. Spark Master 使用 Zookeeper 進行 HA,有哪些源資料儲存到 Zookeeper 裡面?

spark 通過這個引數 spark.deploy.zookeeper.dir 指定 master 元資料在 zookeeper 中儲存的位置,包括 Worker,Driver 和 Application 以及 Executors。standby 節點要從 zk 中,獲得元資料資訊,恢復叢集執行狀態,才能對外繼續提供服務,作業提交資源申請等,在恢復前是不能接受請求的。

注:Master 切換需要注意 2 點:1、在 Master 切換的過程中,所有的已經在執行的程式皆正常執行!因為 Spark Application 在執行前就已經通過 Cluster Manager 獲得了計算資源,所以在執行時 Job 本身的 排程和處理和 Master 是沒有任何關係。2、在 Master 的切換過程中唯一的影響是不能提交新的 Job:一方面不能夠提交新的應用程式給叢集, 因為只有 Active Master 才能接受新的程式的提交請求;另外一方面,已經執行的程式中也不能夠因 Action 操作觸發新的 Job 的提交請求。

文章源自五分鐘學大資料