Spark面試題整理(更新中)
**1.Spark master使用zookeeper進行HA的,有哪些元資料儲存在Zookeeper?
**答:spark通過這個引數spark.deploy.zookeeper.dir指定master元資料在zookeeper中儲存的位置,包括Worker,Driver和Application以及Executors。standby節點要從zk中,獲得元資料資訊,恢復叢集執行狀態,才能對外繼續提供服務,作業提交資源申請等,在恢復前是不能接受請求的。另外,Master切換需要注意2點:
1)在Master切換的過程中,所有的已經在執行的程式皆正常執行!因為SparkApplication在執行前就已經通過Cluster Manager獲得了計算資源,所以在執行時Job本身的排程和處理和Master是沒有任何關係的!
2) 在Master的切換過程中唯一的影響是不能提交新的Job:一方面不能夠提交新的應用程式給叢集,因為只有Active Master才能接受新的程式的提交請求;另外一方面,已經執行的程式中也不能夠因為Action操作觸發新的Job的提交請求;
**2.Spark master HA 主從切換過程不會影響叢集已有的作業執行,為什麼?
**答:Master切換需要注意2點:
1)在Master切換的過程中,所有的已經在執行的程式皆正常執行!因為SparkApplication在執行前就已經通過Cluster Manager獲得了計算資源,所以在執行時Job本身的排程和處理和Master是沒有任何關係的!
2) 在Master的切換過程中唯一的影響是不能提交新的Job:一方面不能夠提交新的應用程式給叢集,因為只有Active Master才能接受新的程式的提交請求;另外一方面,已經執行的程式中也不能夠因為Action操作觸發新的Job的提交請求;
3.Spark on Mesos中,什麼是的粗粒度分配,什麼是細粒度分配,各自的優點和缺點是什麼?
答:1)粗粒度:啟動時就分配好資源, 程式啟動,後續具體使用就使用分配好的資源,不需要再分配資源;好處:作業特別多時,資源複用率高,適合粗粒度;不好:容易資源浪費,假如一個job有1000個task,完成了999個,還有一個沒完成,那麼使用粗粒度,999個資源就會閒置在那裡,資源浪費。
2)細粒度分配:用資源的時候分配,用完了就立即回收資源,啟動會麻煩一點,啟動一次分配一次,會比較麻煩。
4.如何配置spark master的HA?
1)配置zookeeper
2)修改spark_env.sh檔案,spark的master引數不在指定,新增如下程式碼到各個master節點
export SPARK_DAEMON_JAVA_OPTS="- Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=zk01:2181,zk02:2181,zk03:2181-Dspark.deploy.zookeeper.dir=/spark"
3) 將spark_env.sh分發到各個節點
4)找到一個master節點,執行./start-all.sh,會在這裡啟動主master,其他的master備節點,啟動master命令: ./sbin/start-master.sh
5)提交程式的時候指定master的時候要指定三臺master,例如./spark-shell --masterspark://master01:7077,master02:7077,master03:7077
5.Apache Spark有哪些常見的穩定版本,Spark1.6.0的數字分別代表什麼意思?
答:常見的大的穩定版本有Spark1.3,Spark1.6, Spark 2.0 ,Spark1.6.0的數字含義
1)第一個數字:1 major version : 代表大版本更新,一般都會有一些 api 的變化,以及大的優化或是一些結構的改變;
2)第二個數字:6 minor version : 代表小版本更新,一般會新加 api,或者是對當前的 api 就行優化,或者是其他內容的更新,比如說 WEB UI 的更新等等;
3)第三個數字:0patch version , 代表修復當前小版本存在的一些 bug,基本不會有任何api 的改變和功能更新;
記得有一個大神曾經說過,如果要切換 spark 版本的話,最好選 patch version 非 0 的版本,因為一般類似於 1.2.0, … 1.6.0 這樣的版本是屬於大更新的,有可能會有一些隱藏的 bug 或是不穩定性存在,所以最好選擇 1.2.1, … 1.6.1 這樣的版本。通過版本號的解釋說明,可以很容易瞭解到,spark2.1.1的釋出時是針對大版本2.1做的一些bug修改,不會新增功能,也不會新增API,會比2.1.0版本更加穩定。
6.driver的功能是什麼?
答: 1)一個Spark作業執行時包括一個Driver程序,也是作業的主程序,具有main函式,並且有SparkContext的例項,是程式的人口點;
2)功能:負責向叢集申請資源,向master註冊資訊,負責了作業的排程,,負責作業的解析、生成Stage並排程Task到Executor上。包括DAGScheduler,TaskScheduler。
7.Spark中Work的主要工作是什麼?
答:主要功能:管理當前節點記憶體,CPU的使用狀況,接收master分配過來的資源指令,通過ExecutorRunner啟動程式分配任務,worker就類似於包工頭,管理分配新程序,做計算的服務,相當於process服務。需要注意的是:
1)worker會不會彙報當前資訊給master,worker心跳給master主要只有workid,它不會發送資源資訊以心跳的方式給mater,master分配的時候就知道work,只有出現故障的時候才會傳送資源。
2)worker不會執行程式碼,具體執行的是Executor是可以執行具體appliaction寫的業務邏輯程式碼,操作程式碼的節點,它不會執行程式的程式碼的。
**8.Spark的有幾種部署模式,每種模式特點?
答:1)本地模式Spark不一定非要跑在hadoop叢集,可以在本地,起多個執行緒的方式來指定。將Spark應用以多執行緒的方式直接執行在本地,一般都是為了方便除錯,本地模式分三類· local:只啟動一個executor· local[k]:啟動k個cpu數目相同的executor
2)standalone模式分散式部署叢集, 自帶完整的服務,資源管理和任務監控是Spark自己監控,這個模式也是其他模式的基礎
3)Spark on yarn模式分散式部署叢集,資源和任務監控交給yarn管理,但是目前僅支援粗粒度資源分配方式,包含cluster和client執行模式,cluster適合生產,driver執行在叢集子節點,具有容錯功能,client適合除錯,dirver執行在客戶端
4)Spark On Mesos模式。官方推薦這種模式(當然,原因之一是血緣關係)。正是由於Spark開發之初就考慮到支援Mesos,因此,目前而言,Spark執行在Mesos上會比執行在YARN上更加靈活,更加自然。使用者可選擇兩種排程模式之一執行自己的應用程式:1) 粗粒度模式(Coarse-grained Mode):每個應用程式的執行環境由一個Dirver和若干個Executor組成,其中,每個Executor佔用若干資源,內部可執行多個Task(對應多少個“slot”)。應用程式的各個任務正式執行之前,需要將執行環境中的資源全部申請好,且執行過程中要一直佔用這些資源,即使不用,最後程式執行結束後,回收這些資源。2) 細粒度模式(Fine-grained Mode):鑑於粗粒度模式會造成大量資源浪費,Spark On Mesos還提供了另外一種排程模式:細粒度模式,這種模式類似於現在的雲端計算,思想是按需分配。
9.Spark技術棧有哪些元件,每個元件都有什麼功能,適合什麼應用場景?
答:可以分別解釋下每個元件的功能和場景
1)Spark core:是其它元件的基礎,spark的核心,主要包含:有向迴圈圖、RDD、Lingage、Cache、broadcast等,並封裝了底層通訊框架,是Spark的基礎。
2)SparkStreaming是一個對實時資料流進行高通量、容錯處理的流式處理系統,可以對多種資料來源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等複雜操作,將流式計算分解成一系列短小的批處理作業。
3)SparkSQL:Shark是SparkSQL的前身,Spark SQL的一個重要特點是其能夠統一處理關係表和RDD,使得開發人員可以輕鬆地使用SQL命令進行外部查詢,同時進行更復雜的資料分析
4)BlinkDB :是一個用於在海量資料上執行互動式 SQL 查詢的大規模並行查詢引擎,它允許使用者通過權衡資料精度來提升查詢響應時間,其資料的精度被控制在允許的誤差範圍內。
5)MLBase是Spark生態圈的一部分專注於機器學習,讓機器學習的門檻更低,讓一些可能並不瞭解機器學習的使用者也能方便地使用MLbase。MLBase分為四部分:MLlib、MLI、ML Optimizer和MLRuntime。
6)GraphX是Spark中用於圖和圖平行計算
10.Spark為什麼比mapreduce快?
答:1)基於記憶體計算,減少低效的磁碟互動;
2)高效的排程演算法,基於DAG;3)容錯機制Linage,精華部分就是DAG和Lingae
兩者的具體比較可以看https://blog.csdn.net/qq_43656596/article/details/86300842
11.spark中的RDD是什麼,有哪些特性?
RDD(Resilient Distributed Dataset)叫做分散式彈性資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。
Dataset:就是一個集合,用於存放資料的
Distributed:分散式,可以並行在叢集計算
Resilient:表示彈性的:
1)RDD中的資料可以儲存在記憶體或者是磁碟
2)RDD中的分割槽是可以改變的
12.談談spark中的寬窄依賴.
RDD和它依賴的父RDD(s)的關係有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency),寬窄依賴是Spark劃分stage的根據。
寬依賴:指的是多個子RDD的Partition會依賴同一個父RDD的Partition
窄依賴:指的是每一個父RDD的Partition最多被子RDD的一個Partition使用。
13.spark中如何劃分stage?
spark中的stage是根據RDD中的寬窄依賴進行劃分的。DAGSchuduler根據lineage繪製DAG有向無環圖,然後根據DAG進行劃分,從當前job的最後一個運算元往前推,遇到寬依賴,那麼當前在這個批次中的所有運算元操作都劃分成一個stage,然後繼續按照這種方式在繼續往前推,如在遇到寬依賴,又劃分成一個stage,一直到最前面的一個運算元。最後整個job會被劃分成多個stage,而stage之間又存在依賴關係,後面的stage依賴於前面的stage。
14.spark 如何防止記憶體溢位?
spark的是基於記憶體運算的,所以最可能出現的問題就是記憶體溢位,主要有以下幾個方面:
1)driver端的記憶體溢位
在Spark程式中,SparkContext,DAGScheduler都是執行在Driver端的。對應rdd的Stage切分也是在Driver端執行,如果使用者自己寫的程式有過多的步驟,切分出過多的Stage,這部分資訊消耗的是Driver的記憶體,這個時候就需要調大Driver的記憶體。
這個時候可以通過增大driver的記憶體引數:spark.driver.memory (default 1g)來解決。
2)map過程產生大量物件導致記憶體溢位
這種溢位的原因是在單個map中產生了大量的物件導致的,
例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),這個操作在rdd中,每個物件都產生了10000個物件,這肯定很容易產生記憶體溢位的問題。針對這種問題,在不增加記憶體的情況下,可以通過減少每個Task的大小,以便達到每個Task即使產生大量的物件Executor的記憶體也能夠裝得下。具體做法可以在會產生大量物件的map操作之前呼叫repartition方法,分割槽成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
面對這種問題注意,不能使用rdd.coalesce方法,這個方法只能減少分割槽,不能增加分割槽,不會有shuffle的過程。
3)資料傾斜導致記憶體溢位
資料傾斜除了有可能導致記憶體溢位外,也有可能導致效能的問題,同樣可以呼叫repartition重新分割槽解決。
4)shuffle後記憶體溢位
shuffle記憶體溢位的情況可以說都是shuffle後,單個檔案過大導致的。在Spark中,join,reduceByKey這一型別的過程,都會有shuffle的過程,在shuffle的使用,需要傳入一個partitioner,大部分Spark中的shuffle操作,預設的partitioner都是HashPatitioner,預設值是父RDD中最大的分割槽數,這個引數通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions), spark.default.parallelism引數只對HashPartitioner有效,所以如果是別的Partitioner或者自己實現的Partitioner就不能使用spark.default.parallelism這個引數來控制shuffle的併發量了。如果是別的partitioner導致的shuffle記憶體溢位,就需要從partitioner的程式碼增加partitions的數量。
5)standalone模式下資源分配不均勻導致記憶體溢位
在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 這兩個引數,但是沒有配置–executor-cores這個引數的話,就有可能導致,每個Executor的memory是一樣的,但是cores的數量不同,那麼在cores數量多的Executor中,由於能夠同時執行多個Task,就容易導致記憶體溢位的情況。這種情況的解決方法就是同時配置–executor-cores或者spark.executor.cores引數,確保Executor資源分配均勻。
6)使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache(),也就是改變持久化的方式,減少記憶體持久化所佔的比重。
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價的,在記憶體不足的時候rdd.cache()的資料會丟失,再次使用的時候會重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在記憶體不足的時候會儲存在磁碟,避免重算,只是消耗點IO時間。
7)減少物件建立,或公用物件等可以減少記憶體使用
A.使用廣播變數broadcast將公用資料共享,可以減少記憶體使用
B.減少公共物件,這種現象較少,例如:
rdd.flatMap(x=>for(i <- 1 to 1000) yield (“key”,”value”))導致OOM,但是在同樣的情況下,使用rdd.flatMap(x=>for(i <- 1 to 1000) yield “key”+”value”)就不會有OOM的問題,這是因為每次(“key”,”value”)都產生一個Tuple物件,而”key”+”value”,不管多少個,都只有一個物件,指向常量池。
15.如何解決spark中的資料傾斜問題?
發現數據傾斜的時候,可以但不應該侷限於提高executor的資源來解決問題,通常修改引數或是修改程式碼就能夠解決一般的異常資料。
1、資料問題造成的資料傾斜
首先找出異常的key,一般通過sample運算元抽樣來判斷,
比如: df.select(“key”).sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k._2,k._1)).sortByKey(false).take(10)。如果發現個別資料比其他資料大上若干個數量級,則說明發生了資料傾斜。
一般傾斜的資料主要有以下三種情況:
1、null(空值)或是一些無意義的資訊()之類的,大多是這個原因引起。
2、無效資料,大量重複的測試資料或是對結果影響不大的有效資料。
3、有效資料,業務導致的正常資料分佈。
解決辦法
第1,2種情況,直接對資料進行過濾即可(因為該資料對當前業務不會產生影響)。
第3種情況則需要進行一些特殊操作,常見的有以下幾種做法
(1) 隔離執行,將異常的key過濾出來單獨處理,最後與正常資料的處理結果進行union操作。
(2) 先區域性聚合再整體聚合,也就是對key先新增隨機值,進行操作後,去掉隨機值,再進行一次操作。
(3) 使用reduceByKey 代替 groupByKey(reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函式自定義.)
(4) 使用map join代替reduce join。在小表不是特別大(取決於你的executor大小)的情況下使用,可以使程式避免shuffle的過程,自然也就沒有資料傾斜的困擾了.
2、spark使用不當造成的資料傾斜
可以提高shuffle並行度,如dataFrame和sparkSql可以設定spark.sql.shuffle.partitions引數控制shuffle的併發度,預設為200;rdd操作可以設定spark.default.parallelism控制併發度,預設引數由不同的Cluster Manager控制。
16.SparkStreaming與Kafka的整合方式
1)receiver方式:
在提交 Spark Streaming 任務後,Spark 叢集會劃出指定的 Receivers 來專門、持續不斷、非同步讀取 Kafka 的資料,讀取時間間隔以及每次讀取 offsets 範圍可以由引數來配置。一般用zookeeper對offset進行儲存。當 driver 觸發 batch 任務的時候,Receivers 中的資料會轉移到剩餘的 Executors 中去執行。在執行完之後,Receivers 會相應更新 ZooKeeper 的 offset。如要確保 at least once的讀取方式,可以設 spark.streaming.receiver.writeAheadLog.enable 為 true。
2)direct方式:
Direct 方式採用 Kafka 簡單的 consumer api 方式來讀取資料,無需經由 ZooKeeper,此種方式不再需要專門 Receiver 來持續不斷讀取資料。當 batch 任務觸發時,由 Executor 讀取資料,並參與到其他 Executor 的資料計算過程中去。driver 來決定讀取多少 offset,並將offset 交由 checkpoint 來維護。Direct 方式無需 Receiver 讀取資料,而是需要計算時再讀取資料,所以 Direct 方式的資料消費對記憶體的要求不高,只需要考慮批量計算所需要的記憶體即可;另外 batch 任務堆積時,也不會影響資料堆積。
兩者之間的比較將單獨寫一篇文章。