1. 程式人生 > >Spark Core 的核心理論增強

Spark Core 的核心理論增強

1. spark的核心概念

 (1)Application

   表示應用程式,包含一個 Driver Program 和若干 Executor。(編寫的spark程式碼)

 (2)Driver program

   Spark 中的 Driver 即執行上述 Application 的 main()函式並且建立 SparkContext,其中建立 SparkContext 的目的是為了準備 Spark 應用程式的執行環境。由 SparkContext 負責與 ClusterManager 通訊,進行資源的申請,任務的分配和監控等。程式執 行完畢後關閉 SparkContext。

 (3)Cluster Manager

   在 Standalone 模式中即為 Master(主節點),控制整個叢集,監控 Worker。 在 YARN 模式中為資源管理器(resourcemanager)。

 (4)Spark Context

   整個應用的上下文,控制應用程式的生命週期,負責排程各個運算資源, 協調各個 Worker 上的 Executor。初始化的時候,會初始化 DAGScheduler 和 TaskScheduler 兩個核心元件。

 (5)RDD

   Spark 的基本計算單元,一組 RDD 可形成執行的有向無環圖 RDD Graph。

 (6)DAGScheduler

   將application拆分成多個job,對於每一個job構建成一個DAG,將這個DAG劃分成多個stage,最終把stage提交給TaskScheduler。

 (7)TaskScheduler

  將DAGScheduler提交過來的stage,拆分成多個task集合,然後將 TaskSet 提交給 Worker(叢集)執行,每個 Executor 執行什麼 Task 就 是在此處分配的。

 (8)Worker

  叢集中可以執行 Application 程式碼的節點。在 Standalone 模式中指的是通過 slave 檔案配置的 worker 節點,在 Spark on Yarn 模式中指的就是 NodeManager 節點。(即執行Executor的節點)

 (9)Executor

  某個 Application 執行在 Worker 節點上的一個程序,該程序負責執行某些 task, 並且負責將資料存在記憶體或者磁碟上。在 Spark on Yarn 模式下,其程序名稱為 CoarseGrainedExecutorBackend,一個 CoarseGrainedExecutorBackend 程序有且僅有一個 executor 物件,它負責將 Task 包裝成 taskRunner,並從執行緒池中抽取出一個空閒執行緒執行 Task, 這樣,每個 CoarseGrainedExecutorBackend 能並行執行 Task 的資料就取決於分配給它的 CPU 的個數。

 (10)Stage

  每個 Job 會被拆分很多組 Task,每組作為一個 TaskSet,其名稱為 Stage

 (11)Job

包含多個 Task 組成的平行計算,是由 Action 行為觸發的,觸發一次action,就是一個job

 (12)SparkEnv

  執行緒級別的上下文,儲存執行時的重要元件的引用。SparkEnv 內建立幷包含 如下一些重要元件的引用:
   MapOutPutTracker:負責 Shuffle 元資訊的儲存。
   BroadcastManager:負責廣播變數的控制與元資訊的儲存。
   MapOutPutTracker:負責儲存管理、建立和查詢塊.
   MetricsSystem:監控執行時效能指標資訊。
   SparkConf:負責儲存配置資訊。

2. spark的任務執行流程

 (1)基本執行流程:

Spark Core 的核心理論增強
第一步(構建DAG):使用運算元操作RDD進行各種transformation 操作,最後通過action運算元觸發spark的作業提交。提交後,spark會根據轉化過程中所產生的RDD之間依賴關係構建DAG有向無環圖。
第二步(DAG的切割):DAG 切割主要根據 RDD 的依賴是否為寬依賴來決定切割節點,當遇到寬依賴就將任務劃分 為一個新的排程階段(Stage)。每個 Stage 中包含一個或多個 Task。這些 Task 將形成任務集 (TaskSet),提交給底層排程器進行排程執行。
第三步(任務排程):每一個 Spark 任務排程器只為一個 SparkContext 例項服務。當任務排程器收到任務集後負責 把任務集以 Task 任務的形式分發至 Worker 節點的 Executor 程序中執行,如果某個任務失敗, 任務排程器負責重新分配該任務的計算。
第四步(執行任務):當 Executor 收到傳送過來的任務後,將以多執行緒(會在啟動 executor 的時候就初始化好了 一個執行緒池)的方式執行任務的計算,每個執行緒負責一個任務,任務結束後會根據任務的類 型選擇相應的返回方式將結果返回給任務排程器。(cluster manager)。

 (2)大體執行流程:

Spark Core 的核心理論增強

  • 初始化sparkcontext,sparkcontext向資源管理器註冊並申請執行executor資源
  • 資源管理器分配executor資源並啟動,Executor 執行情況將 隨著心跳傳送到資源管理器上
  • Executor向driver反向註冊,告知driver資源準備完畢,可以執行任務
  • sparkcontext構建RDD,然後通過DAGscheduler將RDD切分成多個stage,然後分裝成為taskset交給task schdeluder。
  • taskscheduler向worker中的executor傳送task 執行相應的任務。

    補充:spark on yarn 中的client/cluster的區別?
    spark on yarn -- cluster
    Spark Core 的核心理論增強
    spark on yarn -- client
    Spark Core 的核心理論增強
    注意:在spark作業執行的過程中,一般情況下,會有大量的資料在driver和叢集中進行互動,所有如果使用client模式,則會在程式執行的過程中,造成大量的網路資料傳輸,造成網路流量的激增,而基於cluster模式,因為driver和appmaster在一個節點上,driver本身就在叢集中,所以資料的傳輸也是在叢集內部中完成,網路傳輸壓力相對較小。

      (3)詳細執行流程:

    Spark Core 的核心理論增強
    根據上圖的資訊:
       - 啟動spark叢集,通過spark-shell啟動spark叢集中的相應的master和worker。master是叢集的管理者,清楚叢集中的從節點的個數、從節點的資源情況,以及從節點是否存活。
       - worker節點的註冊,當worker程序啟動之後,向master程序傳送註冊訊息,所以worker是一個基於AKKA actor的事件驅動模型,master同樣也是。worker註冊成功之後,也會向master傳送心跳,監聽主節點是否存在,以及彙報心跳。
       -driver提交作業:driver向spark叢集提交作業,就是向master提交作業,註冊spark應用需要的資源,說白了就是向master申請應用程式執行的資源。
       - master分配資源:當driver提交作業請求之後,mater接收到相應的請求,會向worker節點指派相應的作業任務,就是在worker節點中啟動相應的executor程序,executor維護一個執行緒池,執行緒池中的執行緒是真正去執行task任務。
       - worker啟動executor:當worker節點接收到master啟動executor之後,會相應的啟動一個或者多個executor,並向master彙報啟動成功資訊,表示可以接收任務。
       - 向 driver的反向註冊:當worker節點啟動executor成功之後,會向driver反向註冊,告訴driver哪些executor可以接收任務,執行spark任務。
       - driver接收worker的註冊:driver接收到worker的註冊資訊之後,就初始化相應的 executor_info資訊,根據worker傳送過來的executorid,可以確定,哪些executor對自己服務。
       - driver初始化sparkcontext:sparkcontext構建RDD,然後通過DAGscheduler將RDD切分成多個stage,然後分裝成為taskset交給task schdeluder。
       - taskscheduler向worker中的executor傳送task執行相應的任務
       - executor執行任務:當executor程序接收到了driver傳送過來的taskset之後,進行反序列化然後將這些task分裝到一個叫taskrunner的執行緒中,然後放入到本地的執行緒池中排程相應的作業執行。當執行完畢之後,將所得到的結果進行落地(返回給driver/列印輸出/儲存到本地/儲存到hdfs...)