spark學習進度13(spark和高階特效)
這一節基本上全是概念:::
更新的幾種寫法:
部署:
邏輯:
其實 RDD 並沒有什麼嚴格的邏輯執行圖和物理執行圖的概念, 這裡也只是借用這個概念, 從而讓整個 RDD 的原理可以解釋, 好理解.
對於 RDD 的邏輯執行圖, 起始於第一個入口 RDD 的建立, 結束於 Action 運算元執行之前, 主要的過程就是生成一組互相有依賴關係的 RDD, 其並不會真的執行, 只是表示 RDD 之間的關係, 資料的流轉過程.
物理:
當觸發 Action 執行的時候, 這一組互相依賴的 RDD 要被處理, 所以要轉化為可執行的物理執行圖, 排程到叢集中執行.
因為大部分 RDD 是不真正存放資料的, 只是資料從中流轉, 所以, 不能直接在叢集中執行 RDD, 要有一種 Pipeline 的思想, 需要將這組 RDD 轉為 Stage 和 Task, 從而執行 Task, 優化整體執行速度.
以上的邏輯執行圖會生成如下的物理執行圖, 這一切發生在 Action 操作被執行時.
從上圖可以總結如下幾個點
-
在第一個
Stage
中, 每一個這樣的執行流程是一個Task
, 也就是在同一個 Stage 中的所有 RDD 的對應分割槽, 在同一個 Task 中執行 -
Stage 的劃分是由 Shuffle 操作來確定的, 有 Shuffle 的地方, Stage 斷開
textFile
運算元的背後-
研究
RDD
的功能或者表現的時候, 其實本質上研究的就是RDD
中的五大屬性, 因為RDD
透過五大屬性來提供功能和表現, 所以如果要研究textFile
這個運算元, 應該從五大屬性著手, 那麼第一步就要看看生成的RDD
是什麼型別的RDD
-
textFile
生成的是HadoopRDD
除了上面這一個步驟以外, 後續步驟將不再直接基於程式碼進行講解, 因為從程式碼的角度著手容易迷失邏輯, 這個章節的初心有兩個, 一個是希望大家瞭解 Spark 的內部邏輯和原理, 另外一個是希望大家能夠通過本章學習具有程式碼分析的能力
-
HadoopRDD
的Partitions
對應了HDFS
的Blocks
其實本質上每個
HadoopRDD
的Partition
都是對應了一個Hadoop
的Block
, 通過InputFormat
來確定Hadoop
中的Block
的位置和邊界, 從而可以供一些運算元使用 -
HadoopRDD
的compute
函式就是在讀取HDFS
中的Block
本質上,
compute
還是依然使用InputFormat
來讀取HDFS
中對應分割槽的Block
-
textFile
這個運算元生成的其實是一個MapPartitionsRDD
textFile
這個運算元的作用是讀取HDFS
上的檔案, 但是HadoopRDD
中存放是一個元組, 其Key
是行號, 其Value
是Hadoop
中定義的Text
物件, 這一點和MapReduce
程式中的行為是一致的但是並不適合
Spark
的場景, 所以最終會通過一個map
運算元, 將(LineNum, Text)
轉為String
形式的一行一行的資料, 所以最終textFile
這個運算元生成的RDD
並不是HadoopRDD
, 而是一個MapPartitionsRDD
-
map
運算元的背後-
-
map
運算元生成了MapPartitionsRDD
由原始碼可知, 當
val rdd2 = rdd1.map()
的時候, 其實生成的新RDD
是rdd2
,rdd2
的型別是MapPartitionsRDD
, 每個RDD
中的五大屬性都會有一些不同, 由map
運算元生成的RDD
中的計算函式, 本質上就是遍歷對應分割槽的資料, 將每一個數據轉成另外的形式 -
MapPartitionsRDD
的計算函式是collection.map( function )
真正執行的叢集中的處理單元是
Task
, 每個Task
對應一個RDD
的分割槽, 所以collection
對應一個RDD
分割槽的所有資料, 而這個計算的含義就是將一個RDD
的分割槽上所有資料當作一個集合, 通過這個Scala
集合的map
運算元, 來執行一個轉換操作, 其轉換操作的函式就是傳入map
運算元的function
-
傳入
map
運算元的函式會被清理這個清理主要是處理閉包中的依賴, 使得這個閉包可以被序列化發往不同的叢集節點執行
-
flatMap
運算元的背後-
flatMap
和map
運算元其實本質上是一樣的, 其步驟和生成的RDD
都是一樣, 只是對於傳入函式的處理不同,map
是collect.map( function )
而flatMap
是collect.flatMap( function )
從側面印證了, 其實
Spark
中的flatMap
和Scala
基礎中的flatMap
其實是一樣的 textRDD
→splitRDD
→tupleRDD
-
由
總結textRDD
到splitRDD
再到tupleRDD
的過程, 其實就是呼叫map
和flatMap
運算元生成新的RDD
的過程, 所以如下圖所示, 就是這個階段所生成的邏輯計劃
- 如何生成
RDD
? -
生成
RDD
的常見方式有三種-
從本地集合建立
-
從外部資料集建立
-
從其它
RDD
衍生
通過外部資料集建立
RDD
, 是通過Hadoop
或者其它外部資料來源的SDK
來進行資料讀取, 同時如果外部資料來源是有分片的話,RDD
會將分割槽與其分片進行對照通過其它
RDD
衍生的話, 其實本質上就是通過不同的運算元生成不同的RDD
的子類物件, 從而控制compute
函式的行為來實現運算元功能 -
- 生成哪些
RDD
? -
不同的運算元生成不同的
RDD
, 生成RDD
的型別取決於運算元, 例如map
和flatMap
都會生成RDD
的子類MapPartitions
的物件 - 如何計算
RDD
中的資料 ? -
雖然前面我們提到過
RDD
是偏向計算的, 但是其實RDD
還只是表示資料, 縱觀RDD
的五大屬性中有三個是必須的, 分別如下-
Partitions List
分割槽列表 -
Compute function
計算函式 -
Dependencies
依賴
雖然計算函式是和計算有關的, 但是隻有呼叫了這個函式才會進行計算,
RDD
顯然不會自己呼叫自己的Compute
函式, 一定是由外部呼叫的, 所以RDD
更多的意義是用於表示資料集以及其來源, 和針對於資料的計算所以如何計算
RDD
中的資料呢? 一定是通過其它的元件來計算的, 而計算的規則, 由RDD
中的Compute
函式來指定, 不同型別的RDD
子類有不同的Compute
函式什麼是RDD
之間的依賴關係?-
什麼是關係(依賴關係) ?
從運算元視角上來看,
splitRDD
通過map
運算元得到了tupleRDD
, 所以splitRDD
和tupleRDD
之間的關係是map
但是僅僅這樣說, 會不夠全面, 從細節上來看,
RDD
只是資料和關於資料的計算, 而具體執行這種計算得出結果的是一個神祕的其它元件, 所以, 這兩個RDD
的關係可以表示為splitRDD
的資料通過map
操作, 被傳入tupleRDD
, 這是它們之間更細化的關係但是
RDD
這個概念本身並不是資料容器, 資料真正應該存放的地方是RDD
的分割槽, 所以如果把視角放在資料這一層面上的話, 直接講這兩個 RDD 之間有關係是不科學的, 應該從這兩個 RDD 的分割槽之間的關係來討論它們之間的關係 -
那這些分割槽之間是什麼關係?
如果僅僅說
splitRDD
和tupleRDD
之間的話, 那它們的分割槽之間就是一對一的關係但是
tupleRDD
到reduceRDD
呢?tupleRDD
通過運算元reduceByKey
生成reduceRDD
, 而這個運算元是一個Shuffle
操作,Shuffle
操作的兩個RDD
的分割槽之間並不是一對一,reduceByKey
的一個分割槽對應tupleRDD
的多個分割槽
reduceByKey
運算元會生成ShuffledRDD
reduceByKey
是由運算元combineByKey
來實現的,combineByKey
內部會建立ShuffledRDD
返回, 具體的程式碼請大家通過IDEA
來進行檢視, 此處不再截圖, 而整個reduceByKey
操作大致如下過程去掉兩個
reducer
端的分割槽, 只留下一個的話, 如下所以, 對於
reduceByKey
這個Shuffle
操作來說,reducer
端的一個分割槽, 會從多個mapper
端的分割槽拿取資料, 是一個多對一的關係至此為止, 出現了兩種分割槽見的關係了, 一種是一對一, 一種是多對一
整體上的流程圖
物理圖的作用是什麼?- 問題一: 物理圖的意義是什麼?
-
物理圖解決的其實就是
RDD
流程生成以後, 如何計算和執行的問題, 也就是如何把 RDD 放在叢集中執行的問題 - 問題二: 如果要確定如何執行的問題, 則需要先確定叢集中有什麼元件
-
-
首先叢集中物理元件就是一臺一臺的機器
-
其次這些機器上跑的守護程序有兩種:
Master
,Worker
-
每個守護程序其實就代表了一臺機器, 代表這臺機器的角色, 代表這臺機器和外界通訊
-
例如我們常說一臺機器是
Master
, 其含義是這臺機器中運行了一個Master
守護程序, 如果一臺機器運行了Master
的同時又運行了Worker
, 則說這臺機器是Master
也可以, 說它是Worker
也行
-
-
真正能執行
RDD
的元件是:Executor
, 也就是說其實RDD
最終是執行在Executor
中的, 也就是說, 無論是Master
還是Worker
其實都是用於管理Executor
和排程程式的
結論是
RDD
一定在Executor
中計算, 而Master
和Worker
負責排程和管理Executor
-
- 問題三: 物理圖的生成需要考慮什麼問題?
-
-
要計算
RDD
, 不僅要計算, 還要很快的計算 → 優化效能 -
要考慮容錯, 容錯的常見手段是快取 →
RDD
要可以快取
結論是在生成物理圖的時候, 不僅要考慮效率問題, 還要考慮一種更合適的方式, 讓
RDD
執行的更好 -
- 問題一: RDD 是什麼, 用來做什麼 ?
-
回顧一下
RDD
的五個屬性-
A list of partitions
-
A function for computing each split
-
A list of dependencies on other RDDs
-
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
-
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
簡單的說就是: 分割槽列表, 計算函式, 依賴關係, 分割槽函式, 最佳位置
-
分割槽列表, 分割槽函式, 最佳位置, 這三個屬性其實說的就是資料集在哪, 在哪更合適, 如何分割槽
-
計算函式和依賴關係, 這兩個屬性其實說的是資料集從哪來
所以結論是
RDD
是一個數據集的表示, 不僅表示了資料集, 還表示了這個資料集從哪來, 如何計算但是問題是, 誰來計算 ? 如果為一臺汽車設計了一個設計圖, 那麼設計圖自己生產汽車嗎 ?
-
- 問題二: 誰來計算 ?
-
前面我們明確了兩件事,
RDD
在哪被計算? 在Executor
中.RDD
是什麼? 是一個數據集以及其如何計算的圖紙.直接使用
Executor
也是不合適的, 因為一個計算的執行總是需要一個容器, 例如JVM
是一個程序, 只有程序中才能有執行緒, 所以這個計算RDD
的執行緒應該執行在一個程序中, 這個程序就是Exeutor
,Executor
有如下兩個職責-
和
Driver
保持互動從而認領屬於自己的任務 -
接受任務後, 執行任務
-
所以, 應該由一個執行緒來執行
問題三: Task 該如何設計 ?RDD
的計算任務, 而Executor
作為執行這個任務的容器, 也就是一個程序, 用於建立和執行執行緒, 這個執行具體計算任務的執行緒叫做Task
第一個想法是每個
RDD
都由一個Task
來計算 第二個想法是一整個邏輯執行圖中所有的RDD
都由一組Task
來執行 第三個想法是分階段執行- 第一個想法: 為每個 RDD 的分割槽設定一組 Task
-
大概就是每個
RDD
都有三個Task
, 每個Task
對應一個RDD
的分割槽, 執行一個分割槽的資料的計算但是這麼做有一個非常難以解決的問題, 就是資料儲存的問題, 例如
Task 1, 4, 7, 10, 13, 16
在同一個流程上, 但是這些Task
之間需要交換資料, 因為這些Task
可能被排程到不同的機器上上, 所以Task1
執行完了資料以後需要暫存, 後交給Task4
來獲取這只是一個簡單的邏輯圖, 如果是一個複雜的邏輯圖, 會有什麼表現? 要儲存多少資料? 無論是放在磁碟還是放在記憶體中, 是不是都是一種極大的負擔?
- 第二個想法: 讓資料流動
-
很自然的, 第一個想法的問題是資料需要儲存和交換, 那不儲存不就好了嗎? 對, 可以讓資料流動起來
第一個要解決的問題就是, 要為資料建立管道(
Pipeline
), 有了管道, 就可以流動簡單來說, 就是為所有的
RDD
有關聯的分割槽使用同一個Task
, 但是就沒問題了嗎? 請關注紅框部分這兩個
RDD
之間是Shuffle
關係, 也就是說, 右邊的RDD
的一個分割槽可能依賴左邊RDD
的所有分割槽, 這樣的話, 資料在這個地方流不動了, 怎麼辦? - 第三個想法: 劃分階段
-
既然在
Shuffle
處資料流不動了, 那就可以在這個地方中斷一下, 後面Stage
部分詳解
為了減少執行任務, 減少資料暫存和交換的機會, 所以需要建立管道, 讓資料沿著管道流動, 其實也就是原先每個
RDD
都有一組Task
, 現在改為所有的RDD
共用一組Task
, 但是也有問題, 問題如下就是說, 在
Shuffle
處, 必須斷開管道, 進行資料交換, 交換過後, 繼續流動, 所以整個流程可以變為如下樣子把
Task
斷開成兩個部分,Task4
可以從Task 1, 2, 3
中獲取資料, 後Task4
又作為管道, 繼續讓資料在其中流動但是還有一個問題, 說斷開就直接斷開嗎? 不用打個招呼的呀? 這個斷開即沒有道理, 也沒有規則, 所以可以為這個斷開增加一個概念叫做階段, 按照階段斷開, 階段的英文叫做
Stage
, 如下所以劃分階段的本身就是設定斷開點的規則, 那麼該如何劃分階段呢?
-
第一步, 從最後一個
RDD
, 也就是邏輯圖中最右邊的RDD
開始, 向前滑動Stage
的範圍, 為Stage0
-
第二步, 遇到
ShuffleDependency
斷開Stage
, 從下一個RDD
開始建立新的Stage
, 為Stage1
-
第三步, 新的
Stage
按照同樣的規則繼續滑動, 直到包裹所有的RDD
總結來看, 就是針對於寬窄依賴來判斷, 一個
Stage
中只有窄依賴, 因為只有窄依賴才能形成資料的Pipeline
.如果要進行
Shuffle
的話, 資料是流不過去的, 必須要拷貝和拉取. 所以遇到RDD
寬依賴的兩個RDD
時, 要切斷這兩個RDD
的Stage
.這樣一個 RDD 依賴的鏈條, 我們稱之為 RDD 的血統, 其中有寬依賴也有窄依賴
val sc = ... val textRDD = sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop")) val splitRDD = textRDD.flatMap(_.split(" ")) val tupleRDD = splitRDD.map((_, 1)) val reduceRDD = tupleRDD.reduceByKey(_ + _) val strRDD = reduceRDD.map(item => s"${item._1}, ${item._2}") strRDD.collect.foreach(item => println(item))
上述程式碼是這個章節我們一直使用的程式碼流程, 如下是其完整的邏輯執行圖
如果放在叢集中執行, 通過
WebUI
可以檢視到如下DAG
結構- Step 1: 從
ResultStage
開始執行 -
最接近
Result
部分的Stage id
為 0, 這個Stage
被稱之為ResultStage
由程式碼可以知道, 最終呼叫
Action
促使整個流程執行的是最後一個RDD
,strRDD.collect
, 所以當執行RDD
的計算時候, 先計算的也是這個RDD
- Step 2:
RDD
之間是有關聯的 -
前面已經知道, 最後一個
RDD
先得到執行機會, 先從這個RDD
開始執行, 但是這個RDD
中有資料嗎 ? 如果沒有資料, 它的計算是什麼? 它的計算是從父RDD
中獲取資料, 並執行傳入的運算元的函式簡單來說, 從產生
Result
的地方開始計算, 但是其RDD
中是沒資料的, 所以會找到父RDD
來要資料, 父RDD
也沒有資料, 繼續向上要, 所以, 計算從Result
處呼叫, 但是從整個邏輯圖中的最左邊RDD
開始, 類似一個遞迴的過程
-