Spark 基本概念及 jobs stages tasks 等 解釋
基礎概念理解
-
Application
使用者在 spark 上構建的程式,包含了 driver 程式以及在叢集上執行的程式程式碼,物理機器上涉及了 driver,master,worker 三個節點.
-
Driver Program
建立 sc ,定義 udf 函式,定義一個 spark 應用程式所需要的三大步驟的邏輯:載入資料集,處理資料,結果展示。
-
Cluster Manager
叢集的資源管理器,在叢集上獲取資源的外部服務。 拿 Yarn 舉例,客戶端程式會向 Yarn 申請計算我這個任務需要多少的 memory,多少 CPU,etc。 然後 Cluster Manager 會通過排程告訴客戶端可以使用,然後客戶端就可以把程式送到每個 Worker Node 上面去執行了。
-
Worker Node
叢集中任何一個可以執行spark應用程式碼的節點。Worker Node就是物理節點,可以在上面啟動Executor程序。
-
Executor
在每個 Worker Node 上為某應用啟動的一個程序,該程序負責執行任務,並且負責將資料存在記憶體或者磁碟上,每個任務都有各自獨立的 Executor。 Executor 是一個執行 Task 的容器。它的主要職責是:
- 初始化程式要執行的上下文 SparkEnv,解決應用程式需要執行時的 jar 包的依賴,載入類。
- 同時還有一個 ExecutorBackend 向 cluster manager 彙報當前的任務狀態,這一方面有點類似 hadoop的 tasktracker 和 task。
總結:Executor 是一個應用程式執行的監控和執行容器。
-
Jobs
包含很多 task 的平行計算,可以認為是 Spark RDD 裡面的 action,每個 action 的觸發會生成一個job。 使用者提交的 Job 會提交給 DAGScheduler,Job 會被分解成 Stage,Stage 會被細化成 Task,Task 簡單的說就是在一個數據 partition 上的單個數據處理流程。
-
Stage 一個 Job 會被拆分為多組 Task,每組任務被稱為一個 Stage 就像 Map Stage, Reduce Stage。
-
Task
被送到 executor 上的工作單元。
在 Spark 中有兩類 task:
- shuffleMapTask 輸出是shuffle所需資料, stage的劃分也以此為依據,shuffle之前的所有變換是一個stage,shuffle之後的操作是另一個stage。
- resultTask 輸出是result,比如 rdd.parallize(1 to 10).foreach(println) 這個操作沒有shuffle,直接就輸出了,那麼只有它的task是resultTask,stage也只有一個; 如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 這個job因為有reduce,所以有一個shuffle過程,那麼reduceByKey之前的是一個stage,執行shuffleMapTask,輸出shuffle所需的資料,reduceByKey到最後是一個stage,直接就輸出結果了。如果job中有多次shuffle,那麼每個shuffle之前都是一個stage。
-
Partition
Partition 類似 hadoop 的 Split,計算是以 partition 為單位進行的,當然 partition 的劃分依據有很多,這是可以自己定義的,像 HDFS 檔案,劃分的方式就和 MapReduce 一樣,以檔案的 block 來劃分不同的 partition。總而言之,Spark 的 partition 在概念上與 hadoop 中的 split 是相似的,提供了一種劃分資料的方式。
Block與Partition之間區別
- hdfs中的block是分散式儲存的最小單元,類似於盛放檔案的盒子,一個檔案可能要佔多個盒子,但一個盒子裡的內容只可能來自同一份檔案。假設block設定為128M,你的檔案是250M,那麼這份檔案佔3個block(128+128+2)。這樣的設計雖然會有一部分磁碟空間的浪費,但是整齊的block大小,便於快速找到、讀取對應的內容。(p.s. 考慮到hdfs冗餘設計,預設三份拷貝,實際上3*3=9個block的物理空間。)
- spark中的partition 是彈性分散式資料集RDD的最小單元,RDD是由分佈在各個節點上的partition 組成的。partition 是指的spark在計算過程中,生成的資料在計算空間內最小單元,同一份資料(RDD)的partition 大小不一,數量不定,是根據application裡的運算元和最初讀入的資料分塊數量決定的,這也是為什麼叫“彈性分散式”資料集的原因之一。總結:block位於儲存空間、partition 位於計算空間,block的大小是固定的、partition 大小是不固定的,block是有冗餘的、不會輕易丟失,partition(RDD)沒有冗餘設計、丟失之後重新計算得到
-
RDD
每個RDD有5個主要的屬性:
- 一組分片(partition),即資料集的基本組成單位
- 一個計算每個分片的函式
- 對parent RDD的依賴,這個依賴描述了RDD之間的lineage
- 對於key-value的RDD,一個Partitioner,這是可選擇的
- 一個列表,儲存存取每個partition的preferred位置。對於一個HDFS檔案來說,儲存每個partition所在的塊的位置。這也是可選擇的
把上面這5個主要的屬性總結一下,可以得出RDD的大致概念。首先要知道,RDD大概是這樣一種表示資料集的東西,它具有以上列出的一些屬性。是spark專案組設計用來表示資料集的一種資料結構。而spark專案組為了讓RDD能handle更多的問題,又規定RDD應該是隻讀的,分割槽記錄的一種資料集合中。可以通過兩種方式來建立RDD:一種是基於物理儲存中的資料,比如說磁碟上的檔案;另一種,也是大多數建立RDD的方式,即通過其他RDD來建立【以後叫做轉換】而成。而正因為RDD滿足了這麼多特性,所以spark把RDD叫做Resilient Distributed Datasets,中文叫做彈性分散式資料集。
可以總下出幾個它的特性來:
- 不變的資料結構儲存
- 支援跨叢集的分散式資料結構
- 可以根據資料記錄的key對結構進行分割槽
- 提供了粗粒度的操作,且這些操作都支援分割槽
- 它將資料儲存在記憶體中,從而提供了低延遲性
-
cores
每一個 core,相當於一個 worker 上的程序,這些程序會同時執行分配到這個 worker 上的任務。簡單的說,就是 spark manager 把一個 job 切分幾個 task 分發到 worker 上同步執行,而每個 worker 把分配給自己的 task 再切分成幾個 subtask,分配給當前 worker 上的不同程序。
-
Memory
分配給 spark 應用的記憶體有三個方面的應用:
- spark 本身
- spark 應用過程中 runtime 使用,比如 UDF 函式
- spark 應用中的 cache
-
narrow/wide dependences(寬依賴/窄依賴)
Spark中RDD的高效與DAG(有向無環圖)有很大的關係,在DAG排程中需要對計算的過程劃分Stage,劃分的依據就是RDD之間的依賴關係。RDD之間的依賴關係分為兩種,寬依賴(wide dependency/shuffle dependency)和窄依賴(narrow dependency)
- 窄依賴 窄依賴就是指父RDD的每個分割槽只被一個子RDD分割槽使用,子RDD分割槽通常只對應常數個父RDD分割槽,如下圖所示【其中每個小方塊代表一個RDD Partition】 窄依賴有分為兩種: 一種是一對一的依賴,即OneToOneDependency 還有一個是範圍的依賴,即RangeDependency,它僅僅被org.apache.spark.rdd.UnionRDD使用。UnionRDD是把多個RDD合成一個RDD,這些RDD是被拼接而成,即每個parent RDD的Partition的相對順序不會變,只不過每個parent RDD在UnionRDD中的Partition的起始位置不同
- 寬依賴 寬依賴就是指父RDD的每個分割槽都有可能被多個子RDD分割槽使用,子RDD分割槽通常對應父RDD所有分割槽,如下圖所示【其中每個小方塊代表一個RDD Partition】
-
本地記憶體與叢集記憶體
本地記憶體,是指在 driver 端的程式所需要的記憶體,由 driver 機器提供,一般用來生成測試資料,接受運算結果等; 叢集記憶體,是指提交到叢集的作業能夠向叢集申請的最多記憶體使用量,一般用來儲存關鍵資料
-
shuffle shuffle 是兩個 stage 之間的資料傳輸過程。
RDD內在 與 Spark Stage
當使用流式資料來源例如 Kafka 時,它會根據所讀取的 Topic 中的 Partition 數量來建立 RDD 中的 Partition,一一對應,這樣就能並行讀取源資料,理論上 Kafka 的 Partition 越多,併發程度越好。 對於 RDD 的 Partition 數量, 使用 Spark Streaming 的 Kafka 套件來說,是與 Kafka 的 Partition 相同,對於普通 RDD 而言,如初始化時無指定 Partition 數量,預設是資源 CPU 個數。可以通過 Spark UI 檢視 Task 數量確定
Spark 中,Partition 的數量很大程度上影響 Spark 排程,執行一個任務的效率,且 Partition 的數量並非一塵不變,根據你所提交的操作,會動態的改變新生成 RDD 中的 Partition 數量。 這類操作被分為大致兩類 寬依賴,窄依賴
RDD 在各種操作中會不斷生成新的 RDD,其中的 Partition 可能隨之而變。
- 例如 map,flatMap 即窄依賴,不會改變 Partition數量,新 RDD 和 舊 RDD 的 Partition 是一致的
- reduceByKey 即寬依賴,會改變 Partition 的數量
- Partition 數量的改變就是 Spark 劃分不同 Stage 的標誌
Stage是 Spark 任務排程的階段劃分概念,只有先執行完前面的 Stage,後面的 Stage 才能被執行。 每個 Stage 中按照 Partition 生成一個 task,所有 task 組成 taskset 放進 任務排程其中去排程執行 整個過程叫做 DAG排程
DAG排程 在 Spark 中主要依賴於 Spark 接收到一個任務後對整條關係鏈的劃分
# ---------- map, faltMap ---------- #
# | Part-1 | -------------------> | Part-5 | #
# | Part-2 | -------------------> | Part-6 | #
# | Part-3 | -------------------> | Part-7 | #
# | Part-4 | -------------------> | Part-8 | #
# ---------- 窄依賴 ---------- #
# Stage 1 #