1. 程式人生 > 其它 >大資料Hadoop之——計算引擎Spark

大資料Hadoop之——計算引擎Spark

目錄

一、概述

Apache Spark 是專為大規模資料處理而設計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,Spark,擁有Hadoop MapReduce所具有的優點;但不同於MapReduce的是——Job中間輸出結果可以儲存在記憶體中,從而不再需要讀寫HDFS,因此Spark能更好地適用於資料探勘與機器學習等需要迭代的MapReduce的演算法。

官方地址

1)Spark特點

  • 高效性:不同於MapReduce將中間計算結果放入磁碟中,Spark採用記憶體儲存中間計算結果,減少了迭代運算的磁碟IO,並通過平行計算DAG圖的優化,減少了不同任務之間的依賴,降低了延遲等待時間。記憶體計算下,Spark 比 MapReduce 快100倍。
  • 通用性:Spark提供了統一的解決方案。Spark可以用於批處理、互動式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。
  • 易用性:不同於MapReduce僅支援Map和Reduce兩種程式設計運算元,Spark提供了超過80種不同的Transformation和Action運算元,如map,reduce,filter,groupByKey,sortByKey,foreach等,並且採用函數語言程式設計風格,實現相同的功能需要的程式碼量極大縮小。
  • 相容性:Spark能夠跟很多開源工程相容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和排程器,並且Spark可以讀取多種資料來源,如HDFS、HBase、MySQL等。
  • 容錯性高:Spark引進了彈性分散式資料集RDD (Resilient Distributed Dataset) 的抽象,它是分佈在一組節點中的只讀物件集合,這些集合是彈性的,如果資料集一部分丟失,則可以根據“血統”(即充許基於資料衍生過程)對它們進行重建。另外在RDD計算時可以通過CheckPoint來實現容錯,而CheckPoint有兩種方式:CheckPoint Data,和Logging The Updates,使用者可以控制採用哪種方式來實現容錯。
  • 適用場景廣泛:大資料分析統計,實時資料處理,圖計算及機器學習。

2)Spark適用場景

  • 複雜的批量處理(Batch Data Processing),偏重點在於處理海量資料的能力,至於處理速度可忍受,通常的時間可能是在數十分鐘到數小時。
  • 基於歷史資料的互動式查詢(Interactive Query),通常的時間在數十秒到數十分鐘之間。
  • 基於實時資料流的資料處理(Streaming Data Processing),通常在數百毫秒到數秒之間。

二、Spark核心元件

  • Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構建在RDD和Spark Core之上的。
  • Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行互動的API。每個資料庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。Spark提供的sql形式的對接Hive、JDBC、HBase等各種資料渠道的API,用Java開發人員的思想來講就是面向介面、解耦合,ORMapping、Spring Cloud Stream等都是類似的思想。
  • Spark Streaming:基於SparkCore實現的可擴充套件、高吞吐、高可靠性的實時資料流處理。支援從Kafka、Flume等資料來源處理後儲存到HDFS、DataBase、Dashboard中。對實時資料流進行處理和控制。Spark Streaming允許程式能夠像普通RDD一樣處理實時資料。
  • MLlib:一個常用機器學習演算法庫,演算法被實現為對RDD的Spark操作。這個庫包含可擴充套件的學習演算法,比如分類、迴歸等需要對大量資料集進行迭代的操作。

三、Spark專業術語詳解

1)Application:Spark應用程式

指的是使用者編寫的Spark應用程式,包含了Driver功能程式碼和分佈在叢集中多個節點上執行的Executor程式碼。Spark應用程式,由一個或多個作業JOB組成,如下圖所示:

2)Driver:驅動程式

Spark中的Driver即執行上述Application的Main()函式並且建立SparkContext,其中建立SparkContext的目的是為了準備Spark應用程式的執行環境。在Spark中由SparkContext負責和ClusterManager通訊,進行資源的申請、任務的分配和監控等;當Executor部分執行完畢後,Driver負責將SparkContext關閉。通常SparkContext代表Driver,如下圖所示:

3)Cluster Manager:資源管理器

指的是在叢集上獲取資源的外部服務,常用的有:StandaloneSpark原生的資源管理器,由Master負責資源的分配;Haddop Yarn,由Yarn中的ResearchManager負責資源的分配;Messos,由Messos中的Messos Master負責資源管理。

4)Executor:執行器

Application執行在Worker節點上的一個程序,該程序負責執行Task,並且負責將資料存在記憶體或者磁碟上,每個Application都有各自獨立的一批Executor,如下圖所示:

5)Worker:計算節點

叢集中任何可以執行Application程式碼的節點,類似於Yarn中的NodeManager節點。在Standalone模式中指的就是通過Slave檔案配置的Worker節點,在Spark on Yarn模式中指的就是NodeManager節點,在Spark on Messos模式中指的就是Messos Slave節點,如下圖所示:

6)RDD:彈性分散式資料集

Resillient Distributed Dataset,Spark的基本計算單元,可以通過一系列運算元進行操作(主要有Transformation和Action操作),如下圖所示:

7)窄依賴

父RDD每一個分割槽最多被一個子RDD的分割槽所用;表現為一個父RDD的分割槽對應於一個子RDD的分割槽,或兩個父RDD的分割槽對應於一個子RDD 的分割槽。如圖所示:

8)寬依賴

父RDD的每個分割槽都可能被多個子RDD分割槽所使用,子RDD分割槽通常對應所有的父RDD分割槽。如圖所示:

  • 常見的窄依賴有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned :如果JoinAPI之前被呼叫的RDD API是寬依賴(存在shuffle), 而且兩個join的RDD的分割槽數量一致,join結果的rdd分割槽數量也一樣,這個時候join api是窄依賴)。
  • 常見的寬依賴有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned :除此之外的,rdd 的join api是寬依賴)。

9)DAG:有向無環圖

Directed Acycle graph,反應RDD之間的依賴關係,如圖所示:

10)DAGScheduler:有向無環圖排程器

基於DAG劃分Stage 並以TaskSet的形勢提交Stage給TaskScheduler;負責將作業拆分成不同階段的具有依賴關係的多批任務;最重要的任務之一就是:計算作業和任務的依賴關係,制定排程邏輯。在SparkContext初始化的過程中被例項化,一個SparkContext對應建立一個DAGScheduler。如圖所示:

11)TaskScheduler:任務排程器

將Taskset提交給worker(叢集)執行並回報結果;負責每個具體任務的實際物理排程。如圖所示:

12)Job:作業

由一個或多個排程階段所組成的一次計算作業;包含多個Task組成的平行計算,往往由Spark Action催生,一個JOB包含多個RDD及作用於相應RDD上的各種Operation。如圖所示:

13)Stage:排程階段

一個任務集對應的排程階段;每個Job會被拆分很多組Task,每組任務被稱為Stage,也可稱TaskSet,一個作業分為多個階段;Stage分成兩種型別ShuffleMapStage、ResultStage。如圖所示:

14)TaskSet:任務集

由一組關聯的,但相互之間沒有Shuffle依賴關係的任務所組成的任務集。如圖所示:

15)Task:任務

被送到某個Executor上的工作任務;單個分割槽資料集上的最小處理流程單元。如圖所示:

總體如圖所示:

四、Spark執行基本流程

Spark執行基本流程,如下圖:

計算流程:

七,Spark支援的資源管理器

Spark與資源管理器無關,只要能夠獲取executor程序,並能保持相互通訊就可以了,Spark支援資源管理器包含: Standalone(Spark)、On Mesos、On YARN、Or On K8S,當然還有local模式。

模式 含義
local 在本地執行,只有一個工作程序,無平行計算能力
local[K] 在本地執行,有 K 個工作程序,通常設定 K 為機器的CPU 核心數量
local[*] 在本地執行,工作程序數量等於機器的 CPU 核心數量。
spark://HOST:PORT 以 Standalone 模式執行,這是 Spark 自身提供的叢集執行模式,預設埠號: 7077
mesos://HOST:PORT 在 Mesos 叢集上執行,Driver 程序和 Worker 程序執行在 Mesos 叢集上,部署模式必須使用固定值:--deploy-mode cluster
yarn 在yarn叢集上執行,依賴於hadoop叢集,yarn資源排程框架,將應用提交給yarn,在ApplactionMaster(相當於Stand alone模式中的Master)中執行driver,在叢集上排程資源,開啟excutor執行任務。
k8s 在k8s叢集上執行

七、Spark環境搭建(Spark on Yarn)

1)下載

Spark下載地址:http://spark.apache.org/downloads.html


這裡需要注意版本,我的hadoop版本是3.3.1,這裡spark就下載最新版本的3.2.0,而Spark3.2.0依賴的Scala的2.13,所以後面用到Scala程式設計時注意Scala的版本。

$ cd /opt/bigdata/hadoop/software
# 下載
$ wget https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
# 解壓
$ tar -zxvf spark-3.2.0-bin-hadoop3.2.tgz -C /opt/bigdata/hadoop/server/

2)修改配置檔案

# 進入spark配置目錄
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf
# copy 一個模板配置
$ cp spark-env.sh.template spark-env.sh

在spark-env.sh下加入如下配置

# Hadoop 的配置檔案目錄
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
# YARN 的配置檔案目錄
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
# SPARK 的目錄
export SPARK_HOME=/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
# SPARK 執行檔案目錄
export PATH=$SPARK_HOME/bin:$PATH

複製/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 到其它節點

$ scp -r /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 hadoop-node3:/opt/bigdata/hadoop/server/

3)配置環境變數

在/etc/profile檔案中追加如下內容:

export SPARK_HOME=/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
export PATH=$SPARK_HOME/bin:$PATH

source 載入生效

$ source /etc/profile

4)執行SparkPi(圓周率) 測試驗證

spark-submit 詳細引數說明

引數名 引數說明
--master master 的地址,提交任務到哪裡執行,例如 spark://host:port, yarn, local
--deploy-mode 在本地 (client) 啟動 driver 或在 cluster 上啟動,預設是 client
--class 應用程式的主類,僅針對 java 或 scala 應用
--name 應用程式的名稱
--jars 用逗號分隔的本地 jar 包,設定後,這些 jar 將包含在 driver 和 executor 的 classpath 下
--packages 包含在driver 和executor 的 classpath 中的 jar 的 maven 座標
--exclude-packages 為了避免衝突 而指定不包含的 package
--repositories 遠端 repository
--conf PROP=VALUE 指定 spark 配置屬性的值, 例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"
--properties-file 載入的配置檔案,預設為 conf/spark-defaults.conf
--driver-memory Driver記憶體,預設 1G
--driver-java-options 傳給 driver 的額外的 Java 選項
--driver-library-path 傳給 driver 的額外的庫路徑
--driver-class-path 傳給 driver 的額外的類路徑
--driver-cores Driver 的核數,預設是1。在 yarn 或者 standalone 下使用
--executor-memory 每個 executor 的記憶體,預設是1G
--total-executor-cores 所有 executor 總共的核數。僅僅在 mesos 或者 standalone 下使用
--num-executors 啟動的 executor 數量。預設為2。在 yarn 下使用
--executor-core 每個 executor 的核數。在yarn或者standalone下使用
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--num-executors 3 \
--executor-memory 1G \
--executor-cores 1 \
/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.0.jar 100


如果看到控制檯出現這個,說明執行成功。

檢視yarn任務

檢視任務日誌


【注意】預設情況下,Hadoop歷史服務historyserver是沒有啟動的,我們可以通過下面的命令來啟動Hadoop歷史伺服器。檢視日誌依賴於historyserver服務

#啟動JobHistoryServer服務
$ mapred --daemon start historyserver
#檢視程序
$ jps
#停止JobHistoryServer服務
$ mapred --daemon stop historyserver


至此已經完成的Spark on Yarn 的環境搭建,並通過測試SparkPi的執行成功了。