1. 程式人生 > >spark大資料架構初學入門基礎詳解

spark大資料架構初學入門基礎詳解

Spark是什麼

a) 是一種通用的大資料計算框架

b) Spark Core 離線計算

        Spark SQL 互動式查詢

 Spark Streaming 實時流式計算

 Spark MLlib 機器學習

 Spark GraphX 圖計算

c) 特點:

i. 一站式:一個技術堆疊解決大資料領域的計算問題

ii. 基於記憶體

d) Spark2009年誕生於伯克利大學的AMPLab實驗室

2010年正式開源了Spark專案

2013Spark成為Apache下的專案

2014年飛速發展,成為Apache的頂級專案

2015年在國內興起,代替mr,hive,storm

作者:辛湜(shi)

e) SparkHive

Spark優點:

i. 速度快

ii. Spark SQL支援大量不同的資料來源

f) Spark Storm

i. 計算模型不一樣

ii. Spark吞吐量大

g) 特點:快,易用,通用,相容性

h) spark執行模式

i. local(本地)

ii. standalone(叢集)

iii. on yarn(yarn作為資源排程Spark負責任務排程和計算)

iv. on mesos(mesos作為資源排程S)

v. on cloud()

i) 配置步驟

=======================on yarn====================

【說明】

1. spark任務執行在yarn上,由yarn來進行資源排程和管理,spark只負責任務的排程 和計算

2. 不需要配置和啟動spark叢集

3. 只需要在提交任務的節點上安裝並配置spark on yarn 模式

4. 必須找一臺節點安裝spark

5. 步驟:

i. 安裝配置JDK

ii. vi spark-env.sh

1. export  JAVA_HOME=/opt/modules/jdk1.7_6.0

2. export  HADOOP_CONF_DIR = /opt/modules/hadoop/etc/hadoop

iii. 測試spark on yarn 模式是否安裝成功

iv. 網路測試:

http://hadoop-yarn1.beicai.com:8088

=====================sdandalone模式==============

【說明】

1.  spark執行在spark 叢集上,由spark進行資源排程管理,同時還負責任務的排程和 計算

2. 需要配置和啟動spark叢集

3. 步驟:

i. 安裝配置JDK

ii. 上傳並解壓Spark

iii. 建立軟連線 ln -s spark spark 或者修改名稱

iv. 配置環境變數

v. 安裝配置Spark,修改spark配置檔案(spark-env.sh, slaves)

1. vi spark-env.sh

a) export  JAVA_HOME=/opt/modules/jdk(jdk位置)

b) export SPARK_MASTER_IP=hadoop-yarn1.beicai.com

c) export SPARK_MASTER_PORT=7077

2.  vi slaves(用於指定在哪些節點上啟動worker)

a) hadoop-yarn2.beicai.com

hadoop-yarn3.beicai.com

vi. spark傳送給其他主機

vii. 啟動

/opt/modules/spark/bin/start-all.sh

vii. 檢視SparkUI介面:http://hadoop-yarn1.beicai.com:8080

4. 

j) 

一、Spark原理

  1Spark的執行原理

i、分散式

Ii、主要基於記憶體(少數情況基於磁碟)

Iii、迭代式計算

2、Spark 計算模式 VS  MapReduce  計算模式對比

          Mr這種計算模型比較固定,只有兩種階段,map階段和reduce階段,兩個階段結束    後,任務就結束了,這意味著我們的操作很有限,只能在map階段和reduce階段,    也同時意味著可能需要多個mr任務才能處理完這個job

   Spark 是迭代式計算,一個階段結束後,後面可以有多個階段,直至任務計算完      成,也就意味著我們可以做很多的操作,這就是Spark計算模型比mr 強大的地方

三、什麼是Spark RDD

1、什麼是RDD

彈性的,分散式的,資料集

RDD在邏輯上可以看出來是代表一個HDFS上的檔案,他分為多個分割槽,散落 Spark的多個節點上)

3、RDD----彈性

RDD的某個分割槽的資料儲存到某個節點上,當這個節點的記憶體有限,儲存不了這個 分割槽的全部資料時,Spark就會有選擇性的將部分資料儲存到硬碟上,例如:當worker 的記憶體只能儲存20w條資料時,但是RDD的這個分割槽有30w條資料,這時候Spark 會將多餘的10w條資料,儲存到硬碟上去。Spark的這種有選擇性的在記憶體和硬碟之間的權衡機制就是RDD的彈性特點所在

4、Spark的容錯性

RDD最重要的特性就是,提供了容錯性,可以自動的從失敗的節點上恢復過來,即如 果某個節點上的RDD partition(資料),因為節點的故障丟了,那麼RDD會自動的通過 自己的資料來源重新計算該partition,這一切對使用者來說是透明的


2、Spark的開發型別

   1)、核心開發:離線批處理 / 演示性的互動式資料處理

       2)、SQL查詢:底層都是RDD和計算操作

       3)、底層都是RDD和計算操作

       4)、機器學習

       5)、圖計算

3Spark 核心開發(Spark-core == Spark-RDD)步驟

   1)、建立初始的RDD

   2)、對初始的RDD進行轉換操作形成新的RDD,然後對新的RDD再進行操作,直 至操作計算完成

3)、將最後的RDD的資料儲存到某種介質中(hivehdfsMySQLhbase...

五、Spark原理

DriverMasterWorkerExecutorTask各個節點之間的聯絡

Spark中的各節點的作用:

1、driver的作用:

    1)、 向master進行任務的註冊

(2)、構建執行任務的基本環境

(3)、接受該任務的executor的反向註冊

(4)、向屬於該任務的executor分配任務

2、什麼是driver

   我們編寫的程式打成jar包後,然後找一臺能夠連線spark叢集的節點做任務的driver,具體的表現為SparkSubmit

3、Master的作用:

   1)、監控叢集;

   2)、動態感知worker的上下線;

   3)、接受driver端註冊請求;

   4)、任務資源的排程

4、Worker的作用:

   1)、定時向master彙報狀態;

   2)、接受master資源排程命令,進行資源的排程

   3)、啟動任務的容器Executor

5、Executor的作用:

   1)、儲存計算的RDD分割槽資料;

   2)、向Driver反向註冊;

   3)、接受Driver端傳送來的任務Task,作用在RDD上進行執行

Spark 程式設計的流程:

1、我們編寫的程式打包成jar包,然後呼叫Spark-Submit 指令碼做任務的提交

2、啟動driver做任務的初始化

3、Driver會將任務極其引數(corememorydriver相關的引數)進行封裝成ApplicationDescript通過taskSchedulerImpl 提交給Master

4、Master接受到driver端註冊任務請求時,會將請求引數進行解析,並封裝成APP,然後進行持久化,並且加入到其任務佇列中的waitingAPPs

5、當輪到咱們提交的任務執行時,master會呼叫schedule()這個方法,做任務資源排程

6、Master將排程好的資源封裝成launchExecutor,傳送給指定的worker

7、Worker接收到傳送來的launchExecutor時,會將其解析並封裝成ExecutorRunner,然後呼叫start方法,啟動Executor

8、Executor啟動後,會向任務的Driver進行反向註冊

9、當屬於這個任務的所有executor啟動成功並反向註冊完之後,driver會結束SparkContext物件的初始化

10、sc 初始化成功後,意味著執行任務的基本環境已經準備好了,driver會繼續執行我們編寫好的程式碼

11、開始註冊初始的RDD,並且不斷的進行轉換操作,當觸發了一個action運算元時,意味著觸發了一個job,此時driver就會將RDD之間的依賴關係劃分成一個一個的stage,並將stage封裝成taskset,然後將taskset中的每個task進行序列化,封裝成launchtask,傳送給指定的executor執行

12、Executor接受到driver傳送過來的任務task,會對task進行反序列化,然後將對應的運算元(flatmapmapreduceByKey。。。。)作用在RDD分割槽上

六、RDD詳解

 1、什麼是RDD

RDDResilient Disttibuted Dataset)叫做彈性的分散式的資料集,是spark中最基本的資料抽象,它代表一個不可變,可分割槽,裡面的元素可平行計算的集合

 2RDD的特點:

自動容錯

位置感知性排程

伸縮性

 3RDD的屬性:

(1)、一組分片(partition),即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度,使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值,預設值就是程式所分配到的CPU Core的數目

(2)、一個計算每個分割槽的函式。SparkRDD的計算是以分片為單位的,每個RDD都會實現computer函式以達到這個目的。Computer函式會對迭代器進行復合,不需要儲存每次計算的結果。

(3)、RDD之間的依賴關係RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。

(4)、一個partition,即RDD的分片函式。當前Spark中實現了兩種型別的分片函式,一個是基於hashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於key-valueRDD,才會有Partitioner,非key-valueRDDPartitioner的值是NonePartitioner函式不但決定了RDD本身的分片數量,也決定了partition RDD Shuffle輸出時的分片數量。

(5)、一個列表,儲存存取每個Partition的優先位置(preferred location。對於一個HDFD檔案來說。這個列表儲存的就是每個Partition所在的快的位置。按照“移動資料不如移動計算”的理念。Spark在進行任務排程的時候,會盡可能的將計算任務分配到所要處理資料塊的儲存位置。

4、RDD的建立:

   進行Spark核心程式設計時,首先要做的事就是建立一個初始的RDDSpark Core提供了三種建立RDD的方式:

(1)、使用程式中的集合建立RDD (呼叫parallelize()方法)

(2)、使用本地檔案建立RDD  (呼叫textFile()方法)

(3)、使用HDFD檔案建立RDD  (呼叫textFile()方法)

七、運算元

   1、什麼是運算元?

RDD中定義的作用在每一個RDD分片上的函式,可以對RDD中的資料進行轉換 和操作

   2RDD運算元的分類

(1)Transformation運算元,這類運算元變換不觸發提交作業(特點就是lazy特性)

返回的是一個RDD

(2)Action運算元,這類運算元會觸發SparkContext提交作業(觸發一個spark job的執行,從而觸發這個action之前所有的transformation的執行)

返回的是一個spark物件

   3、常用的Transformation運算元

八、RDD分割槽排序

  I、分割槽

兩種實現方式:coalesce  repartition(底層呼叫coalesce

coalesce(numPartitons,isShuffle)

第一個引數是重分割槽後的數量,第二個引數是是否進行shuffle

如果原來有N個分割槽,重分割槽後有M個分割槽

如果 M > N ,必須將第二引數設定為true(也就是進行shuffle,等價於 repartition(numPartitons)    如果是false將不起作用  

如果M < N

100-->10 重分割槽後的分割槽數比原來的小的多,那麼久需要使用shuffle,也即是設定為true

100-->90 重分割槽後的分割槽數和原來的差不多的,那麼就不需要使用shuffle,也就是設定為false

II、排序

sortBy(x => x)  這個運算元中帶有隱式轉換引數

x 能夠排序(比較大小),那麼這個類就必須有比較大小的功能,也就是實現了compareTo 或者compare

實現二次排序有兩種方法:

1、繼承Comparable 介面 或者 Ordered

2、隱式轉換:可以定義隱式轉換函式(Ordered)或者隱式轉換值(Ordering

九、自定義分割槽

自定義分割槽

要求:按照key將對應的value輸出到指定的分割槽中

解釋:自定義一個自定義分割槽類,繼承partitioner,實現他的兩個方法

      1numPartitions

      2getPartition

具體的功能根據專案的要求自定義實現,然後呼叫partitionBy方法,new出自定義的類,傳入引數即可

九、RDD持久化原理

1、持久化場景:對於一個rdd會被多次引用到,並且這個rdd計算過程複雜,計算時間特變耗時

2、如何進行持久化,呼叫rdd.persist方法或cache方法,cache方法底層就是呼叫persist方法

******************persist(StorageLevel.MEMORY_ONLY)*******************

如果對RDD做持久化,預設持久化級別是storageLevel.MEMORY_ONLY ,也就是持久化到記憶體中去,這種持久化級別是效率最快的,但是由於是純Java 物件,儲存到記憶體中,那麼記憶體可能儲存的數量就會較少

***************persist(StorageLevel.MEMORY_ONLY_SER)****************

如果當我們叢集資源有限時,那麼我們可以採用MEMORY_ONLY_SER,也就是將Java物件進行序列化之後持久到記憶體中去,這種持久化的好處是能夠持久化更多的資料到記憶體中,但是由於在持久化時需要序列化,取出來之後又需要反序列化這一過程,這個過程會消耗CPU計算資源,效能相對於MEMORY_ONLY 這種持久化級別來說稍微弱點,但是還是比較高效的

3、如何選擇RDD持久化策略?

Spark提供的多種持久化級別,主要是為了在CPU和記憶體消耗之間進行取捨,下面是一些通用的持久化級別的選擇建議:

  1)、優先使用MEMORY_ONLY,如果可以快取所有資料的話,那麼就使用這種策略,因為純記憶體速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作

  2)、如果MEMORY_ONLY策略,無法儲存所有資料的話,那麼使用MEMORY_ONLY_SER,將資料進行序列化儲存,純記憶體操作還是非常快的,只是要消耗CPU進行反序列化

  3)、如果需要進行快速的失敗恢復,那麼就選擇帶字尾為_2的策略,進行資料的備份,這樣在失敗時,就不需要重新計算了

  4、能不使用DISK相關的策略,就不要使用,有的時候,從磁碟讀取資料,還不如重新計算一次

十一、共享變數

1、共享變數分為兩種:廣播變數     累加器

廣播變數(broadcast

2、日常所遇問題

  因為每個task都需要拷貝這樣的一個副本到executor去執行,那麼我們可以想象一下,如果有1000 task在某個worker上執行,而這個副本有100M,那麼意味著我們需要拷貝100G的資料都到某個worker上執行,這樣的話會大大消耗我們的網路流量,同時會加大executor的記憶體消耗,從而增加了我們spark作業的執行時間,大大降低了spark作業的執行效率,增加了作業失敗的概率

3、如何解決以上問題,也就是說什麼時候使用廣播變數?

  RDD引用到了一個外部變數並且這個外部變數資料量不小,同時這個RDD對應的task數量特別多,那麼此時使用廣播共享變數再合適不過了

  我們可以將這種大的外部變數做成廣播變數,外部變數做成廣播變數的時候,那麼每個executor的記憶體中只會有一個外部變數而這個副本針對所有的task都是共享的,這樣的話就減少了網路流量消耗,降低了executor的記憶體消耗,提高了spark作業執行效率和縮短了執行時間,同時降低了作業失敗的概率

4、廣播變數的使用流程:

   1)、某個executor的第一個task先執行,首先會從自己的blockManager中查詢外部變數,如果沒有就從鄰居的executorblockManager的記憶體中獲取這個外部變數,如果還是獲取不到,就從driver端獲取,拷貝這個外部變數到本地的executorblockManager

   2)、當這個executor的其他task執行時,就不需要從外面獲取這個外部變數的副本,直接從本地的blockManager中獲取即可

5、如何獲取廣播變數的值?

   可以直接呼叫廣播變數的value() 這個方法即可

【注意】廣播變數是隻讀的,不可寫

累加器(Accumulator

Spark提供的Accumulator ,主要用於多個節點對一個變數進行共享性的操作,Accumulator只是提供了累加的功能。但是卻給我們提供了多個task對一個變數並行操作的功能,但是task只能對Accumulator進行累加操作

【注意】task只能對Accumulator進行類加操作,只有Driver程式可以讀取Accumulator的值

RDD分割槽和容錯機制講解

1、RDD Lineage血統

   RDD只支援粗粒度轉換,即在大量記錄上執行的單個操作,將建立RDD的一系列Lineage(血統)記錄下來。以便恢復丟失的分割槽

2、RDD的依賴關係

   RDD和它的父RDD的關係有兩種不同的型別:

1)、窄依賴(一對一,多對一)

形象的比喻:獨生子女

2)、寬依賴(多對多)

形象的比喻:超生

註釋:劃分stage的依據就是寬依賴,也就是RDD之間是否有shuffleshuffle過程就是一個寬依賴過程,shuffle之前的tasks就屬於一個stageshuffle之後的也屬於一個stageshuffle之前和之後的操作都是窄依賴

【注意】shuffle過程分為:shuffle Write過程 和 shuffle read過程

4、DAG的生成(有向無環圖)和任務的劃分

   DAGDirected Acyclic Graph)叫做有向無環圖(有方向無迴圈的圖)

5、一個wordCount過程會產生多少個RDD

   至少會產生五個RDD

第一個,從HDFS中載入後得到一個RDD(即使用sc.textFile()運算元),即HadoopRDD

  sc.textFile()過程中還會產生一個RDD(呼叫map運算元),產生一個MapPartitionRDD

第二個,使用flatMap運算元,得到一個MapPartitionRDD

第三個,使用map運算元,得到一個MapPartitionRDD

第四個,使用reduceByKey運算元,也就是在經過了shuffle過程後又會得到一個shuffledRDD

第五個,使用saveAsTextFile運算元,再產生一個MapPartitionRDD 


spark程式提交流程講解

Spark任務簡介:

   Spark-submit--->SparkSubmit-->main-->submit-->doRunMain-->RunMain-->通過反射建立我們編寫的主類的例項物件,呼叫main方法-->開始執行我們編寫的程式碼-->初始化SparkContext物件-->建立初始的RDD-->觸發action運算元-->提交job-->worker執行任務-->任務結束

Spark任務詳解: 

  1)、將我們編寫的程式打成jar

  2)、呼叫spark-submit指令碼提交任務到叢集上執行

  3)、執行sparkSubmitmain方法,在這個方法中通過反射的方式建立我們編寫的主類的例項物件,然後呼叫main方法,開始執行我們的程式碼(注意,我們的spark程式中的driver就執行在sparkSubmit程序中)

  4)、當代碼執行到建立SparkContext物件時,那就開始初始化SparkContext物件了

  5)、在初始化SparkContext物件的時候,會建立兩個特別重要的物件,分別是:DAGScheduler

TaskScheduler

DAGScheduler的作用】將RDD的依賴切分成一個一個的stage,然後將stage作為taskSet提交給DriverActor

  6)、在構建taskScheduler的同時,會建立兩個非常重要的物件,分別是DriverActorClientActor

clientActor的作用】向master註冊使用者提交的任務

DriverActor的作用】接受executor的反向註冊,將任務提交給executor

  7)、clientActor啟動後,會將使用者提交的任務和相關的引數封裝到ApplicationDescription物件中,然後提交給master進行任務的註冊

  8)、master接受到clientActor提交的任務請求時,會將請求引數進行解析,並封裝成Application,然後將其持久化,然後將其加入到任務佇列waitingApps

  9)、當輪到我們提交的任務執行時,就開始呼叫schedule(),進行任務資源的排程

  10)、master將排程好的資源封裝到launchExecutor中傳送給指定的worker

  11)、worker接受到Maseter傳送來的launchExecutor時,會將其解壓並封裝到ExecutorRunner中,然後呼叫這個物件的start(), 啟動Executor

  12)、Executor啟動後會向DriverActor進行反向註冊

  13)、driverActor會發送註冊成功的訊息給Executor

  14)、Executor接受到DriverActor註冊成功的訊息後會建立一個執行緒池,用於執行DriverActor傳送過來的task任務

  15)、當屬於這個任務的所有的Executor啟動並反向註冊成功後,就意味著執行這個任務的環境已經準備好了,driver會結束SparkContext物件的初始化,也就意味著new SparkContext這句程式碼執行完成

  16)、當初始化sc成功後,driver端就會繼續執行我們編寫的程式碼,然後開始建立初始的RDD,然後進行一系列轉換操作,當遇到一個action