1. 程式人生 > >spark相關面試題

spark相關面試題

spark面試問題收集

spark面試問題

1、spark中的RDD是什麼,有哪些特性

  • RDD(Resilient Distributed Dataset)叫做分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。
    • Dataset:就是一個集合,用於存放資料的
    • Distributed:分散式,可以並行在叢集計算
    • Resilient:表示彈性的
      • 彈性表示
        • 1、RDD中的資料可以儲存在記憶體或者是磁碟
        • 2、RDD中的分割槽是可以改變的
  • 五大特性:
    • A list of partitions
      一個分割槽列表,RDD中的資料都存在一個分割槽列表裡面
    • A function for computing each split
      作用在每一個分割槽中的函式
    • A list of dependencies on other RDDs
      一個RDD依賴於其他多個RDD,這個點很重要,RDD的容錯機制就是依據這個特性而來的
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
      可選的,針對於kv型別的RDD才具有這個特性,作用是決定了資料的來源以及資料處理後的去向
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
      可選項,資料本地性,資料位置最優

2、概述一下spark中的常用運算元區別(map、mapPartitions、foreach、foreachPartition)

  • map:用於遍歷RDD,將函式f應用於每一個元素,返回新的RDD(transformation運算元)。
  • foreach:用於遍歷RDD,將函式f應用於每一個元素,無返回值(action運算元)。
  • mapPartitions:用於遍歷操作RDD中的每一個分割槽,返回生成一個新的RDD(transformation運算元)。
  • foreachPartition: 用於遍歷操作RDD中的每一個分割槽。無返回值(action運算元)。

  • 總結:一般使用mapPartitions或者foreachPartition運算元比map和foreach更加高效,推薦使用。

3、談談spark中的寬窄依賴

  • RDD和它依賴的父RDD(s)的關係有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
  • 寬依賴:指的是多個子RDD的Partition會依賴同一個父RDD的Partition
  • 窄依賴:指的是每一個父RDD的Partition最多被子RDD的一個Partition使用。

4、spark中如何劃分stage

  • 1.Spark Application中可以因為不同的Action觸發眾多的job,一個Application中可以有很多的job,每個job是由一個或者多個Stage構成的,後面的Stage依賴於前面的Stage,也就是說只有前面依賴的Stage計算完畢後,後面的Stage才會執行。
  • 2.Stage劃分的依據就是寬依賴,何時產生寬依賴,例如reduceByKey,groupByKey的運算元,會導致寬依賴的產生。
  • 3.由Action(例如collect)導致了SparkContext.runJob的執行,最終導致了DAGScheduler中的submitJob的執行,其核心是通過傳送一個case class JobSubmitted物件給eventProcessLoop。
    eventProcessLoop是DAGSchedulerEventProcessLoop的具體例項,而DAGSchedulerEventProcessLoop是eventLoop的子類,具體實現EventLoop的onReceive方法,onReceive方法轉過來回調doOnReceive
  • 4.在doOnReceive中通過模式匹配的方法把執行路由到
  • 5.在handleJobSubmitted中首先建立finalStage,建立finalStage時候會建立父Stage的依賴鏈條

  • 總結:以來是從程式碼的邏輯層面上來展開說的,可以簡單點說:寫介紹什麼是RDD中的寬窄依賴,然後在根據DAG有向無環圖進行劃分,從當前job的最後一個運算元往前推,遇到寬依賴,那麼當前在這個批次中的所有運算元操作都劃分成一個stage,然後繼續按照這種方式在繼續往前推,如在遇到寬依賴,又劃分成一個stage,一直到最前面的一個運算元。最後整個job會被劃分成多個stage,而stage之間又存在依賴關係,後面的stage依賴於前面的stage。

5、spark-submit的時候如何引入外部jar包

  • 在通過spark-submit提交任務時,可以通過新增配置引數來指定
    • –driver-class-path 外部jar包
    • –jars 外部jar包

6、spark 如何防止記憶體溢位

  • driver端的記憶體溢位
    • 可以增大driver的記憶體引數:spark.driver.memory (default 1g)
    • 這個引數用來設定Driver的記憶體。在Spark程式中,SparkContext,DAGScheduler都是執行在Driver端的。對應rdd的Stage切分也是在Driver端執行,如果使用者自己寫的程式有過多的步驟,切分出過多的Stage,這部分資訊消耗的是Driver的記憶體,這個時候就需要調大Driver的記憶體。
  • map過程產生大量物件導致記憶體溢位
    • 這種溢位的原因是在單個map中產生了大量的物件導致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),這個操作在rdd中,每個物件都產生了10000個物件,這肯定很容易產生記憶體溢位的問題。針對這種問題,在不增加記憶體的情況下,可以通過減少每個Task的大小,以便達到每個Task即使產生大量的物件Executor的記憶體也能夠裝得下。具體做法可以在會產生大量物件的map操作之前呼叫repartition方法,分割槽成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
      面對這種問題注意,不能使用rdd.coalesce方法,這個方法只能減少分割槽,不能增加分割槽,不會有shuffle的過程。
  • 資料不平衡導致記憶體溢位
    • 資料不平衡除了有可能導致記憶體溢位外,也有可能導致效能的問題,解決方法和上面說的類似,就是呼叫repartition重新分割槽。這裡就不再累贅了。
  • shuffle後記憶體溢位
    • shuffle記憶體溢位的情況可以說都是shuffle後,單個檔案過大導致的。在Spark中,join,reduceByKey這一型別的過程,都會有shuffle的過程,在shuffle的使用,需要傳入一個partitioner,大部分Spark中的shuffle操作,預設的partitioner都是HashPatitioner,預設值是父RDD中最大的分割槽數,這個引數通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism引數只對HashPartitioner有效,所以如果是別的Partitioner或者自己實現的Partitioner就不能使用spark.default.parallelism這個引數來控制shuffle的併發量了。如果是別的partitioner導致的shuffle記憶體溢位,就需要從partitioner的程式碼增加partitions的數量。
  • standalone模式下資源分配不均勻導致記憶體溢位

    • 在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 這兩個引數,但是沒有配置–executor-cores這個引數的話,就有可能導致,每個Executor的memory是一樣的,但是cores的數量不同,那麼在cores數量多的Executor中,由於能夠同時執行多個Task,就容易導致記憶體溢位的情況。這種情況的解決方法就是同時配置–executor-cores或者spark.executor.cores引數,確保Executor資源分配均勻。
  • 使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()

    • rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價的,在記憶體不足的時候rdd.cache()的資料會丟失,再次使用的時候會重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在記憶體不足的時候會儲存在磁碟,避免重算,只是消耗點IO時間。

7、spark中cache和persist的區別

  • cache:快取資料,預設是快取在記憶體中,其本質還是呼叫persist
  • persist:快取資料,有豐富的資料快取策略。資料可以儲存在記憶體也可以儲存在磁碟中,使用的時候指定對應的快取級別就可以了。

8、簡要描述Spark分散式叢集搭建的步驟

  • 地球人都知道
  • 這裡可以概述下如何搭建高可用的spark叢集(HA)
    • 主要是引入了zookeeper

9、spark中的資料傾斜的現象、原因、後果

  • (1)、資料傾斜的現象
    • 多數task執行速度較快,少數task執行時間非常長,或者等待很長時間後提示你記憶體不足,執行失敗。
  • (2)、資料傾斜的原因
    • 資料問題
      • 1、key本身分佈不均衡(包括大量的key為空)
      • 2、key的設定不合理
    • spark使用問題
      • 1、shuffle時的併發度不夠
      • 2、計算方式有誤
  • (3)、資料傾斜的後果
    • 1、spark中的stage的執行時間受限於最後那個執行完成的task,因此執行緩慢的任務會拖垮整個程式的執行速度(分散式程式執行的速度是由最慢的那個task決定的)。
    • 2、過多的資料在同一個task中執行,將會把executor撐爆。

10、如何解決spark中的資料傾斜問題

  • 發現數據傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料。

    • 1、資料問題造成的資料傾斜

      • 找出異常的key
        • 如果任務長時間卡在最後最後1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。
          選取key,對資料進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個。
        • 比如: df.select(“key”).sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k._2,k._1)).sortByKey(false).take(10)
        • 如果發現多數資料分佈都較為平均,而個別資料比其他資料大上若干個數量級,則說明發生了資料傾斜。
      • 經過分析,傾斜的資料主要有以下三種情況:
        • 1、null(空值)或是一些無意義的資訊()之類的,大多是這個原因引起。
        • 2、無效資料,大量重複的測試資料或是對結果影響不大的有效資料。
        • 3、有效資料,業務導致的正常資料分佈。
      • 解決辦法
        • 第1,2種情況,直接對資料進行過濾即可(因為該資料對當前業務不會產生影響)。
        • 第3種情況則需要進行一些特殊操作,常見的有以下幾種做法
          • (1) 隔離執行,將異常的key過濾出來單獨處理,最後與正常資料的處理結果進行union操作。
          • (2) 對key先新增隨機值,進行操作後,去掉隨機值,再進行一次操作。
          • (3) 使用reduceByKey 代替 groupByKey(reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函式自定義.)
          • (4) 使用map join。
      • 案例
        • 如果使用reduceByKey因為資料傾斜造成執行失敗的問題。具體操作流程如下:
          • (1) 將原始的 key 轉化為 key + 隨機值(例如Random.nextInt)
          • (2) 對資料進行 reduceByKey(func)
          • (3) 將 key + 隨機值 轉成 key
          • (4) 再對資料進行 reduceByKey(func)
      • 案例操作流程分析:
        • 假設說有傾斜的Key,我們給所有的Key加上一個隨機數,然後進行reduceByKey操作;此時同一個Key會有不同的隨機數字首,在進行reduceByKey操作的時候原來的一個非常大的傾斜的Key就分而治之變成若干個更小的Key,不過此時結果和原來不一樣,怎麼破?進行map操作,目的是把隨機數字首去掉,然後再次進行reduceByKey操作。(當然,如果你很無聊,可以再次做隨機數字首),這樣我們就可以把原本傾斜的Key通過分而治之方案分散開來,最後又進行了全域性聚合
        • 注意1: 如果此時依舊存在問題,建議篩選出傾斜的資料單獨處理。最後將這份資料與正常的資料進行union即可。
        • 注意2: 單獨處理異常資料時,可以配合使用Map Join解決。
    • 2、spark使用不當造成的資料傾斜

      • 提高shuffle並行度

        • dataFrame和sparkSql可以設定spark.sql.shuffle.partitions引數控制shuffle的併發度,預設為200。
        • rdd操作可以設定spark.default.parallelism控制併發度,預設引數由不同的Cluster Manager控制。
        • 侷限性: 只是讓每個task執行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個task單獨執行它,也會受到資料傾斜的困擾。

11、flume整合sparkStreaming問題

  • (1)、如何實現sparkStreaming讀取flume中的資料
    • 可以這樣說:
      • 前期經過技術調研,檢視官網相關資料,發現sparkStreaming整合flume有2種模式,一種是拉模式,一種是推模式,然後在簡單的聊聊這2種模式的特點,以及如何部署實現,需要做哪些事情,最後對比兩種模式的特點,選擇那種模式更好。
        • 推模式:Flume將資料Push推給Spark Streaming
        • 拉模式:Spark Streaming從flume 中Poll拉取資料
  • (2)、在實際開發的時候是如何保證資料不丟失的

    • 可以這樣說:
      • flume那邊採用的channel是將資料落地到磁碟中,保證資料來源端安全性(可以在補充一下,flume在這裡的channel可以設定為memory記憶體中,提高資料接收處理的效率,但是由於資料在記憶體中,安全機制保證不了,故選擇channel為磁碟儲存。整個流程執行有一點的延遲性)
      • sparkStreaming通過拉模式整合的時候,使用了FlumeUtils這樣一個類,該類是需要依賴一個額外的jar包(spark-streaming-flume_2.10)
      • 要想保證資料不丟失,資料的準確性,可以在構建StreamingConext的時候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)來建立一個StreamingContext,使用StreamingContext.getOrCreate來建立StreamingContext物件,傳入的第一個引數是checkpoint的存放目錄,第二引數是生成StreamingContext物件的使用者自定義函式。如果checkpoint的存放目錄存在,則從這個目錄中生成StreamingContext物件;如果不存在,才會呼叫第二個函式來生成新的StreamingContext物件。在creatingFunc函式中,除了生成一個新的StreamingContext操作,還需要完成各種操作,然後呼叫ssc.checkpoint(checkpointDirectory)來初始化checkpoint功能,最後再返回StreamingContext物件。
        這樣,在StreamingContext.getOrCreate之後,就可以直接呼叫start()函式來啟動(或者是從中斷點繼續執行)流式應用了。如果有其他在啟動或繼續執行都要做的工作,可以在start()呼叫前執行。
      • 流失計算中使用checkpoint的作用:
        • 儲存元資料,包括流式應用的配置、流式沒崩潰之前定義的各種操作、未完成所有操作的batch。元資料被儲存到容忍失敗的儲存系統上,如HDFS。這種ckeckpoint主要針對driver失敗後的修復。
        • 儲存流式資料,也是儲存到容忍失敗的儲存系統上,如HDFS。這種ckeckpoint主要針對window operation、有狀態的操作。無論是driver失敗了,還是worker失敗了,這種checkpoint都夠快速恢復,而不需要將很長的歷史資料都重新計算一遍(以便得到當前的狀態)。
      • 設定流式資料checkpoint的週期
        • 對於一個需要做checkpoint的DStream結構,可以通過呼叫DStream.checkpoint(checkpointInterval)來設定ckeckpoint的週期,經驗上一般將這個checkpoint週期設定成batch週期的5至10倍。
      • 使用write ahead logs功能
        • 這是一個可選功能,建議加上。這個功能將使得輸入資料寫入之前配置的checkpoint目錄。這樣有狀態的資料可以從上一個checkpoint開始計算。開啟的方法是把spark.streaming.receiver.writeAheadLogs.enable這個property設定為true。另外,由於輸入RDD的預設StorageLevel是MEMORY_AND_DISK_2,即資料會在兩臺worker上做replication。實際上,Spark Streaming模式下,任何從網路輸入資料的Receiver(如kafka、flume、socket)都會在兩臺機器上做資料備份。如果開啟了write ahead logs的功能,建議把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在建立RDD時由引數傳入。
      • 使用以上的checkpoint機制,確實可以保證資料0丟失。但是一個前提條件是,資料傳送端必須要有快取功能,這樣才能保證在spark應用重啟期間,資料傳送端不會因為spark streaming服務不可用而把資料丟棄。而flume具備這種特性,同樣kafka也具備。
  • (3)Spark Streaming的資料可靠性

    • 有了checkpoint機制、write ahead log機制、Receiver快取機器、可靠的Receiver(即資料接收並備份成功後會傳送ack),可以保證無論是worker失效還是driver失效,都是資料0丟失。原因是:如果沒有Receiver服務的worker失效了,RDD資料可以依賴血統來重新計算;如果Receiver所在worker失敗了,由於Reciever是可靠的,並有write ahead log機制,則收到的資料可以保證不丟;如果driver失敗了,可以從checkpoint中恢復資料重新構建。

12、kafka整合sparkStreaming問題

  • (1)、如何實現sparkStreaming讀取kafka中的資料

    • 可以這樣說:在kafka0.10版本之前有二種方式與sparkStreaming整合,一種是基於receiver,一種是direct,然後分別闡述這2種方式分別是什麼
      • receiver:是採用了kafka高階api,利用receiver接收器來接受kafka topic中的資料,從kafka接收來的資料會儲存在spark的executor中,之後spark streaming提交的job會處理這些資料,kafka中topic的偏移量是儲存在zk中的。
        • 基本使用: val kafkaStream = KafkaUtils.createStream(streamingContext,
          [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
        • 還有幾個需要注意的點:
          • 在Receiver的方式中,Spark中的partition和kafka中的partition並不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加執行緒來處理由單一Receiver消費的主題。但是這並沒有增加Spark在處理資料上的並行度.
          • 對於不同的Group和topic我們可以使用多個Receiver建立不同的Dstream來並行接收資料,之後可以利用union來統一成一個Dstream。
          • 在預設配置下,這種方式可能會因為底層的失敗而丟失資料. 因為receiver一直在接收資料,在其已經通知zookeeper資料接收完成但是還沒有處理的時候,executor突然掛掉(或是driver掛掉通知executor關閉),快取在其中的資料就會丟失. 如果希望做到高可靠, 讓資料零丟失,如果我們啟用了Write Ahead Logs(spark.streaming.receiver.writeAheadLog.enable=true)該機制會同步地將接收到的Kafka資料寫入分散式檔案系統(比如HDFS)上的預寫日誌中. 所以, 即使底層節點出現了失敗, 也可以使用預寫日誌中的資料進行恢復. 複製到檔案系統如HDFS,那麼storage level需要設定成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)
      • direct:在spark1.3之後,引入了Direct方式。不同於Receiver的方式,Direct方式沒有receiver這一層,其會週期性的獲取Kafka中每個topic的每個partition中的最新offsets,之後根據設定的maxRatePerPartition來處理每個batch。(設定spark.streaming.kafka.maxRatePerPartition=10000。限制每秒鐘從topic的每個partition最多消費的訊息條數)。
  • (2) 對比這2中方式的優缺點:

    • 採用receiver方式:這種方式可以保證資料不丟失,但是無法保證資料只被處理一次,WAL實現的是At-least-once語義(至少被處理一次),如果在寫入到外部儲存的資料還沒有將offset更新到zookeeper就掛掉,這些資料將會被反覆消費. 同時,降低了程式的吞吐量。
    • 採用direct方式:相比Receiver模式而言能夠確保機制更加健壯. 區別於使用Receiver來被動接收資料, Direct模式會週期性地主動查詢Kafka, 來獲得每個topic+partition的最新的offset, 從而定義每個batch的offset的範圍. 當處理資料的job啟動時, 就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的資料。
      • 優點:
        • 1、簡化並行讀取
          • 如果要讀取多個partition, 不需要建立多個輸入DStream然後對它們進行union操作. Spark會建立跟Kafka partition一樣多的RDD partition, 並且會並行從Kafka中讀取資料. 所以在Kafka partition和RDD partition之間, 有一個一對一的對映關係.
        • 2、高效能
          • 如果要保證零資料丟失, 在基於receiver的方式中, 需要開啟WAL機制. 這種方式其實效率低下, 因為資料實際上被複制了兩份, Kafka自己本身就有高可靠的機制, 會對資料複製一份, 而這裡又會複製一份到WAL中. 而基於direct的方式, 不依賴Receiver, 不需要開啟WAL機制, 只要Kafka中作了資料的複製, 那麼就可以通過Kafka的副本進行恢復.
        • 3、一次且僅一次的事務機制
          • 基於receiver的方式, 是使用Kafka的高階API來在ZooKeeper中儲存消費過的offset的. 這是消費Kafka資料的傳統方式. 這種方式配合著WAL機制可以保證資料零丟失的高可靠性, 但是卻無法保證資料被處理一次且僅一次, 可能會處理兩次. 因為Spark和ZooKeeper之間可能是不同步的. 基於direct的方式, 使用kafka的簡單api, Spark Streaming自己就負責追蹤消費的offset, 並儲存在checkpoint中. Spark自己一定是同步的, 因此可以保證資料是消費一次且僅消費一次。不過需要自己完成將offset寫入zk的過程,在官方文件中都有相應介紹.
            *簡單程式碼例項:
            * messages.foreachRDD(rdd=>{
            val message = rdd.map(_._2)//對資料進行一些操作
            message.map(method)//更新zk上的offset (自己實現)
            updateZKOffsets(rdd)
            })
            * sparkStreaming程式自己消費完成後,自己主動去更新zk上面的偏移量。也可以將zk中的偏移量儲存在mysql或者redis資料庫中,下次重啟的時候,直接讀取mysql或者redis中的偏移量,獲取到上次消費的偏移量,接著讀取資料。

13、利用scala語言實現排序

  • (1)氣泡排序:

    • package cn.itcast.sort
    • //氣泡排序
    • class BubbleSort {
    • def main(args: Array[String]): Unit = {
    • val list = List(3, 12, 43, 23, 7, 1, 2, 0)
    • println(sort(list))
    • }
    • //定義一個方法,傳入的引數是要進行排序的List集合,輸出的是排序後的List集合
    • def sort(list: List[Int]): List[Int] = list match {
    • case List() => List()
    • case head :: tail => compute(head, sort(tail))
    • }
    • def compute(data: Int, dataSet: List[Int]): List[Int] = dataSet match {
    • case List() => List(data)
    • case head :: tail => if (data <= head) data :: dataSet else * head :: compute(data, tail)
    • }
    • }
  • (2) 快讀排序

    • package cn.itcast.sort
    • //快速排序
    • object QuickSort {
    • def main(args: Array[String]): Unit = {
    • val list = List(3, 12, 43, 23, 7, 1, 2, 0)
    • println(quickSort(list))
      *
    • }
    • //定義一個方法,傳入的引數是要進行排序的List集合,輸出的是排序後的List集合
    • def quickSort(list: List[Int]): List[Int] = {
    • //對輸入引數list進行模式匹配
    • list match {
    • //如果是空,返回nil
    • case Nil => Nil
    • case List() => List()
    • //不為空從list中提取出首元素和剩餘元素組成的列表分別到head和tail中
    • case head :: tail =>
    • //對剩餘元素列表呼叫partition方法,這個方法會將列表分為兩部分。
    • // 劃分依據接受的引數,這個引數是一個函式(這裡是(_ < x))。
    • // partition方法會對每個元素呼叫這個函式,根據返回的true,false分成兩部分。
    • // 這裡’_ < x’是一個匿名函式(又稱lambda),’_’關鍵字是函式輸入引數的佔位符,
    • // 輸入引數這裡是列表中的每個元素。
    • val (left, right) = tail.partition(_ < head)
    • //最後對劃分好的兩部分遞迴呼叫quickSort
    • //其中head::quickSort(right) 這裡::是List定義的一個方法,用於將兩部分合成一個列表
    • quickSort(left) ++ (head :: quickSort(right))
    • }
    • }
    • }