1. 程式人生 > 實用技巧 >spark學習進度13(spark和高階特效)

spark學習進度13(spark和高階特效)

    這一節基本上全是概念:::

更新的幾種寫法:

部署:

邏輯:

其實 RDD 並沒有什麼嚴格的邏輯執行圖和物理執行圖的概念, 這裡也只是借用這個概念, 從而讓整個 RDD 的原理可以解釋, 好理解.

對於 RDD 的邏輯執行圖, 起始於第一個入口 RDD 的建立, 結束於 Action 運算元執行之前, 主要的過程就是生成一組互相有依賴關係的 RDD, 其並不會真的執行, 只是表示 RDD 之間的關係, 資料的流轉過程.

物理:

當觸發 Action 執行的時候, 這一組互相依賴的 RDD 要被處理, 所以要轉化為可執行的物理執行圖, 排程到叢集中執行.

因為大部分 RDD 是不真正存放資料的, 只是資料從中流轉, 所以, 不能直接在叢集中執行 RDD, 要有一種 Pipeline 的思想, 需要將這組 RDD 轉為 Stage 和 Task, 從而執行 Task, 優化整體執行速度.

以上的邏輯執行圖會生成如下的物理執行圖, 這一切發生在 Action 操作被執行時.

從上圖可以總結如下幾個點

  • 在第一個Stage中, 每一個這樣的執行流程是一個Task, 也就是在同一個 Stage 中的所有 RDD 的對應分割槽, 在同一個 Task 中執行

  • Stage 的劃分是由 Shuffle 操作來確定的, 有 Shuffle 的地方, Stage 斷開

textFile運算元的背後

研究RDD的功能或者表現的時候, 其實本質上研究的就是RDD中的五大屬性, 因為RDD透過五大屬性來提供功能和表現, 所以如果要研究textFile這個運算元, 應該從五大屬性著手, 那麼第一步就要看看生成的RDD是什麼型別的RDD

  1. textFile生成的是HadoopRDD

    除了上面這一個步驟以外, 後續步驟將不再直接基於程式碼進行講解, 因為從程式碼的角度著手容易迷失邏輯, 這個章節的初心有兩個, 一個是希望大家瞭解 Spark 的內部邏輯和原理, 另外一個是希望大家能夠通過本章學習具有程式碼分析的能力

  2. HadoopRDDPartitions對應了HDFSBlocks

    其實本質上每個HadoopRDDPartition都是對應了一個HadoopBlock, 通過InputFormat來確定Hadoop中的Block的位置和邊界, 從而可以供一些運算元使用

  3. HadoopRDDcompute函式就是在讀取HDFS中的Block

    本質上,compute還是依然使用InputFormat來讀取HDFS中對應分割槽的Block

  4. textFile這個運算元生成的其實是一個MapPartitionsRDD

    textFile這個運算元的作用是讀取HDFS上的檔案, 但是HadoopRDD中存放是一個元組, 其Key是行號, 其ValueHadoop中定義的Text物件, 這一點和MapReduce程式中的行為是一致的

    但是並不適合Spark的場景, 所以最終會通過一個map運算元, 將(LineNum, Text)轉為String形式的一行一行的資料, 所以最終textFile這個運算元生成的RDD並不是HadoopRDD, 而是一個MapPartitionsRDD

map運算元的背後
  • map運算元生成了MapPartitionsRDD

    由原始碼可知, 當val rdd2 = rdd1.map()的時候, 其實生成的新RDDrdd2,rdd2的型別是MapPartitionsRDD, 每個RDD中的五大屬性都會有一些不同, 由map運算元生成的RDD中的計算函式, 本質上就是遍歷對應分割槽的資料, 將每一個數據轉成另外的形式

  • MapPartitionsRDD的計算函式是collection.map( function )

    真正執行的叢集中的處理單元是Task, 每個Task對應一個RDD的分割槽, 所以collection對應一個RDD分割槽的所有資料, 而這個計算的含義就是將一個RDD的分割槽上所有資料當作一個集合, 通過這個Scala集合的map運算元, 來執行一個轉換操作, 其轉換操作的函式就是傳入map運算元的function

  • 傳入map運算元的函式會被清理

    這個清理主要是處理閉包中的依賴, 使得這個閉包可以被序列化發往不同的叢集節點執行

flatMap運算元的背後

flatMapmap運算元其實本質上是一樣的, 其步驟和生成的RDD都是一樣, 只是對於傳入函式的處理不同,mapcollect.map( function )flatMapcollect.flatMap( function )

從側面印證了, 其實Spark中的flatMapScala基礎中的flatMap其實是一樣的

textRDDsplitRDDtupleRDD

textRDDsplitRDD再到tupleRDD的過程, 其實就是呼叫mapflatMap運算元生成新的RDD的過程, 所以如下圖所示, 就是這個階段所生成的邏輯計劃

總結
如何生成RDD?

生成RDD的常見方式有三種

  • 從本地集合建立

  • 從外部資料集建立

  • 從其它RDD衍生

通過外部資料集建立RDD, 是通過Hadoop或者其它外部資料來源的SDK來進行資料讀取, 同時如果外部資料來源是有分片的話,RDD會將分割槽與其分片進行對照

通過其它RDD衍生的話, 其實本質上就是通過不同的運算元生成不同的RDD的子類物件, 從而控制compute函式的行為來實現運算元功能

生成哪些RDD?

不同的運算元生成不同的RDD, 生成RDD的型別取決於運算元, 例如mapflatMap都會生成RDD的子類MapPartitions的物件

如何計算RDD中的資料 ?

雖然前面我們提到過RDD是偏向計算的, 但是其實RDD還只是表示資料, 縱觀RDD的五大屬性中有三個是必須的, 分別如下

  • Partitions List分割槽列表

  • Compute function計算函式

  • Dependencies依賴

雖然計算函式是和計算有關的, 但是隻有呼叫了這個函式才會進行計算,RDD顯然不會自己呼叫自己的Compute函式, 一定是由外部呼叫的, 所以RDD更多的意義是用於表示資料集以及其來源, 和針對於資料的計算

所以如何計算RDD中的資料呢? 一定是通過其它的元件來計算的, 而計算的規則, 由RDD中的Compute函式來指定, 不同型別的RDD子類有不同的Compute函式什麼是RDD之間的依賴關係?

  • 什麼是關係(依賴關係) ?

    從運算元視角上來看,splitRDD通過map運算元得到了tupleRDD, 所以splitRDDtupleRDD之間的關係是map

    但是僅僅這樣說, 會不夠全面, 從細節上來看,RDD只是資料和關於資料的計算, 而具體執行這種計算得出結果的是一個神祕的其它元件, 所以, 這兩個RDD的關係可以表示為splitRDD的資料通過map操作, 被傳入tupleRDD, 這是它們之間更細化的關係

    但是RDD這個概念本身並不是資料容器, 資料真正應該存放的地方是RDD的分割槽, 所以如果把視角放在資料這一層面上的話, 直接講這兩個 RDD 之間有關係是不科學的, 應該從這兩個 RDD 的分割槽之間的關係來討論它們之間的關係

  • 那這些分割槽之間是什麼關係?

    如果僅僅說splitRDDtupleRDD之間的話, 那它們的分割槽之間就是一對一的關係

    但是tupleRDDreduceRDD呢?tupleRDD通過運算元reduceByKey生成reduceRDD, 而這個運算元是一個Shuffle操作,Shuffle操作的兩個RDD的分割槽之間並不是一對一,reduceByKey的一個分割槽對應tupleRDD的多個分割槽

reduceByKey運算元會生成ShuffledRDD

reduceByKey是由運算元combineByKey來實現的,combineByKey內部會建立ShuffledRDD返回, 具體的程式碼請大家通過IDEA來進行檢視, 此處不再截圖, 而整個reduceByKey操作大致如下過程

去掉兩個reducer端的分割槽, 只留下一個的話, 如下

所以, 對於reduceByKey這個Shuffle操作來說,reducer端的一個分割槽, 會從多個mapper端的分割槽拿取資料, 是一個多對一的關係

至此為止, 出現了兩種分割槽見的關係了, 一種是一對一, 一種是多對一

整體上的流程圖

物理圖的作用是什麼?
問題一: 物理圖的意義是什麼?

物理圖解決的其實就是RDD流程生成以後, 如何計算和執行的問題, 也就是如何把 RDD 放在叢集中執行的問題

問題二: 如果要確定如何執行的問題, 則需要先確定叢集中有什麼元件
  • 首先叢集中物理元件就是一臺一臺的機器

  • 其次這些機器上跑的守護程序有兩種:Master,Worker

    • 每個守護程序其實就代表了一臺機器, 代表這臺機器的角色, 代表這臺機器和外界通訊

    • 例如我們常說一臺機器是Master, 其含義是這臺機器中運行了一個Master守護程序, 如果一臺機器運行了Master的同時又運行了Worker, 則說這臺機器是Master也可以, 說它是Worker也行

  • 真正能執行RDD的元件是:Executor, 也就是說其實RDD最終是執行在Executor中的, 也就是說, 無論是Master還是Worker其實都是用於管理Executor和排程程式的

結論是RDD一定在Executor中計算, 而MasterWorker負責排程和管理Executor

問題三: 物理圖的生成需要考慮什麼問題?
  • 要計算RDD, 不僅要計算, 還要很快的計算 → 優化效能

  • 要考慮容錯, 容錯的常見手段是快取 →RDD要可以快取

結論是在生成物理圖的時候, 不僅要考慮效率問題, 還要考慮一種更合適的方式, 讓RDD執行的更好

誰來計算 RDD ?
問題一: RDD 是什麼, 用來做什麼 ?

回顧一下RDD的五個屬性

  • A list of partitions

  • A function for computing each split

  • A list of dependencies on other RDDs

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

簡單的說就是: 分割槽列表, 計算函式, 依賴關係, 分割槽函式, 最佳位置

  • 分割槽列表, 分割槽函式, 最佳位置, 這三個屬性其實說的就是資料集在哪, 在哪更合適, 如何分割槽

  • 計算函式和依賴關係, 這兩個屬性其實說的是資料集從哪來

所以結論是RDD是一個數據集的表示, 不僅表示了資料集, 還表示了這個資料集從哪來, 如何計算

但是問題是, 誰來計算 ? 如果為一臺汽車設計了一個設計圖, 那麼設計圖自己生產汽車嗎 ?

問題二: 誰來計算 ?

前面我們明確了兩件事,RDD在哪被計算? 在Executor中.RDD是什麼? 是一個數據集以及其如何計算的圖紙.

直接使用Executor也是不合適的, 因為一個計算的執行總是需要一個容器, 例如JVM是一個程序, 只有程序中才能有執行緒, 所以這個計算RDD的執行緒應該執行在一個程序中, 這個程序就是Exeutor,Executor有如下兩個職責

  • Driver保持互動從而認領屬於自己的任務

  • 接受任務後, 執行任務

所以, 應該由一個執行緒來執行RDD的計算任務, 而Executor作為執行這個任務的容器, 也就是一個程序, 用於建立和執行執行緒, 這個執行具體計算任務的執行緒叫做Task

問題三: Task 該如何設計 ?

第一個想法是每個RDD都由一個Task來計算 第二個想法是一整個邏輯執行圖中所有的RDD都由一組Task來執行 第三個想法是分階段執行

第一個想法: 為每個 RDD 的分割槽設定一組 Task

大概就是每個RDD都有三個Task, 每個Task對應一個RDD的分割槽, 執行一個分割槽的資料的計算

但是這麼做有一個非常難以解決的問題, 就是資料儲存的問題, 例如Task 1, 4, 7, 10, 13, 16在同一個流程上, 但是這些Task之間需要交換資料, 因為這些Task可能被排程到不同的機器上上, 所以Task1執行完了資料以後需要暫存, 後交給Task4來獲取

這只是一個簡單的邏輯圖, 如果是一個複雜的邏輯圖, 會有什麼表現? 要儲存多少資料? 無論是放在磁碟還是放在記憶體中, 是不是都是一種極大的負擔?

二個想法: 讓資料流動

很自然的, 第一個想法的問題是資料需要儲存和交換, 那不儲存不就好了嗎? 對, 可以讓資料流動起來

第一個要解決的問題就是, 要為資料建立管道(Pipeline), 有了管道, 就可以流動

簡單來說, 就是為所有的RDD有關聯的分割槽使用同一個Task, 但是就沒問題了嗎? 請關注紅框部分

這兩個RDD之間是Shuffle關係, 也就是說, 右邊的RDD的一個分割槽可能依賴左邊RDD的所有分割槽, 這樣的話, 資料在這個地方流不動了, 怎麼辦?

第三個想法: 劃分階段

既然在Shuffle處資料流不動了, 那就可以在這個地方中斷一下, 後面Stage部分詳解

如何劃分階段 ?

為了減少執行任務, 減少資料暫存和交換的機會, 所以需要建立管道, 讓資料沿著管道流動, 其實也就是原先每個RDD都有一組Task, 現在改為所有的RDD共用一組Task, 但是也有問題, 問題如下

就是說, 在Shuffle處, 必須斷開管道, 進行資料交換, 交換過後, 繼續流動, 所以整個流程可以變為如下樣子

Task斷開成兩個部分,Task4可以從Task 1, 2, 3中獲取資料, 後Task4又作為管道, 繼續讓資料在其中流動

但是還有一個問題, 說斷開就直接斷開嗎? 不用打個招呼的呀? 這個斷開即沒有道理, 也沒有規則, 所以可以為這個斷開增加一個概念叫做階段, 按照階段斷開, 階段的英文叫做Stage, 如下

所以劃分階段的本身就是設定斷開點的規則, 那麼該如何劃分階段呢?

  1. 第一步, 從最後一個RDD, 也就是邏輯圖中最右邊的RDD開始, 向前滑動Stage的範圍, 為Stage0

  2. 第二步, 遇到ShuffleDependency斷開Stage, 從下一個RDD開始建立新的Stage, 為Stage1

  3. 第三步, 新的Stage按照同樣的規則繼續滑動, 直到包裹所有的RDD

總結來看, 就是針對於寬窄依賴來判斷, 一個Stage中只有窄依賴, 因為只有窄依賴才能形成資料的Pipeline.

如果要進行Shuffle的話, 資料是流不過去的, 必須要拷貝和拉取. 所以遇到RDD寬依賴的兩個RDD時, 要切斷這兩個RDDStage.

這樣一個 RDD 依賴的鏈條, 我們稱之為 RDD 的血統, 其中有寬依賴也有窄依賴

資料怎麼流動 ?
val sc = ...

val textRDD = sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop"))
val splitRDD = textRDD.flatMap(_.split(" "))
val tupleRDD = splitRDD.map((_, 1))
val reduceRDD = tupleRDD.reduceByKey(_ + _)
val strRDD = reduceRDD.map(item => s"${item._1}, ${item._2}")

strRDD.collect.foreach(item => println(item))

上述程式碼是這個章節我們一直使用的程式碼流程, 如下是其完整的邏輯執行圖

如果放在叢集中執行, 通過WebUI可以檢視到如下DAG結構

Step 1: 從ResultStage開始執行

最接近Result部分的Stage id為 0, 這個Stage被稱之為ResultStage

由程式碼可以知道, 最終呼叫Action促使整個流程執行的是最後一個RDD,strRDD.collect, 所以當執行RDD的計算時候, 先計算的也是這個RDD

Step 2:RDD之間是有關聯的

前面已經知道, 最後一個RDD先得到執行機會, 先從這個RDD開始執行, 但是這個RDD中有資料嗎 ? 如果沒有資料, 它的計算是什麼? 它的計算是從父RDD中獲取資料, 並執行傳入的運算元的函式

簡單來說, 從產生Result的地方開始計算, 但是其RDD中是沒資料的, 所以會找到父RDD來要資料, 父RDD也沒有資料, 繼續向上要, 所以, 計算從Result處呼叫, 但是從整個邏輯圖中的最左邊RDD開始, 類似一個遞迴的過程