Spark學習之Spark核心
一、Spark中的一些專業術語
任務:
- Application:使用者寫的應用程式,包括Driver Program和Executor Program。
- Job:一個action類運算元觸發執行的操作。
- stage:一組任務(task)就是一個stage。
- task:(thread)在叢集中執行時最小的執行單元。
資源、叢集:
- Master:資源管理的主節點。
- Worker:資源管理的從節點。
- Executor:執行任務的程序。
- ThreadPool:執行緒池,存在於Executor程序中。
二、RDD的寬窄依賴關係
1、窄依賴
父RDD與子RDD,partition之間的關係是一對一,那麼父子RDD的依賴關係就稱之為窄依賴,這種依賴關係不存在shuffle過程。
2、寬依賴
父RDD與子RDD,partition之間的關係是一對多,那麼父子RDD的依賴關係就稱之為寬依賴,這種依賴關係存在shuffle過程。
預設情況下,groupByKey返回的RDD分割槽數與父RDD是一致的,如果在使用groupByKey的時候,傳入一個int型別的值,此時返回的RDD分割槽數就是這個int值。
總結:
- 父RDD不知道有幾個子RDD,但子RDD知道他的父RDD有幾個。基於此特點,形成一個DAG有向無環圖需要
- 寬窄依賴的作用就是為了將一個個的job切割成一個個的stage
三、Stage切割規則
上圖是將job切割成Stage的過程。
總結:
- 切割後的結果是stage與stage之間是寬依賴,stage之間是窄依賴。
- 將job切割成stage的目的?stage與stage之間有shuffle,stage內部無shuffle。
- RDD中實際上儲存的是計算邏輯,而不是真實的資料。
Stage計算模式:
task0這條線所貫穿所有的的partition中的計算邏輯,並且以遞迴函式展開式的形式整合在一起,fun2(fun1(textFile(b1))),最好傳送到b1以及他副本所在的節點。
task1:fun2(fun1(textFile(b1)))最好傳送到b2以及他副本所在的節點。
task的計算模式是pipeline的計算模式,管道計算。
總結:
MapReduce的計算模式是1+1=2,2+1=3,會有資料落地,Spark的計算模式是1+1+1=3,不會有資料落地的情況。
四、任務排程
Spark是一個分散式平行計算框架。我們寫的Application要在叢集中分散式計算,由於大資料中的計算原則是計算找資料,為了將每一個task精準的分發到節點上,此時需要任務排程器,找到資料的位置,從而分發task到節點上。
任務排程過程:
首先根據程式碼生成DAG有向無環圖,然後將有向無環圖交給DAGScheduler:
- 步驟一:根據RDD的寬窄依賴關係,將DAG切割成一個個的stage,將切割出來的stage封裝到TaskSet物件中,然後將一個個的TaskSet給TaskScheduler;
- 步驟二:TaskScheduler拿到TaskSet以後,會遍歷這個結果,拿到每一個task,然後去呼叫HDFS上的某一個方法,獲取資料的位置,根據資料的位置來分發task到Woker節點的Executor程序中的執行緒池中執行;
- 步驟三:TaskScheduler會實時跟蹤每一個task的執行情況,若執行失敗,TaskScheduler會重試提交task,不會無休止的重試,預設是重試3次,如果重試3次依舊失敗,那麼這個task所在stage就失敗了,此時TS向DAGScheduler彙報;
- 步驟四:TaskScheduler向DAGScheduler彙報當前stage失敗,此時DAGScheduler會重試提交stage。注意:每一次重試提交的stage,已經成功執行的不會被再次分發到Executor程序執行,只是提交重試失敗的。
- 如果DAGScheduler重試了4次依然失敗,那麼stage所在的job就失敗了,job失敗是不會進行重試的。DAGScheduler重試次數spark.stage.maxConsecutiveAttempts可設定。
TaskScheduler:retry failed or straggling tasks。當有掙扎(掉隊)任務時,也會重試。
掙扎(掉隊)任務:
比如10000個task中,有9999個執行完成,只有一個task正在執行,那麼這個任務就叫掙扎任務。
TaskScheduler遇到掙扎任務也會重試,此時TaskScheduler會重新提交一個和掙扎task一模一樣的task到叢集中執行,但是掙扎task不會被kill,會讓他倆在叢集中比賽執行,誰先執行完畢,就以誰的結果為準。
推測執行機制:用來判斷哪些task是掙扎task。
推測執行機制的標準:當所有的task75%以上全部執行完畢,那麼TasredkScheduler才會每隔100ms計算一下哪一些task需要推測執行。
例如:100個task中有76個task已執行完畢,24個task沒有執行完畢,此時他會計算這24個task已經執行時間的中位數,然後將中位數*1.5得到最終時間,拿到這個最終計算出來的時間,去檢視哪一些task超時,此時這些task就是掙扎task。
配置資訊的使用:
- 在程式碼中SparkConf
- 在提交Application的時候通過–conf來設定(spark-submit
–master --conf k=v),如果要修改多個配置資訊的值,那麼需要加多個–conf比如說stage、task的重試次數都要修改,此時需要加上兩個–conf,分別來設定。(常用) - 在spark的配置檔案中配置,spark-default.conf