spark大資料架構初學入門基礎詳解
Spark是什麼
a) 是一種通用的大資料計算框架
b) Spark Core 離線計算
Spark SQL 互動式查詢
Spark Streaming 實時流式計算
Spark MLlib 機器學習
Spark GraphX 圖計算
c) 特點:
i. 一站式:一個技術堆疊解決大資料領域的計算問題
ii. 基於記憶體
d) Spark2009年誕生於伯克利大學的AMPLab實驗室
2010年正式開源了Spark專案
2013年Spark成為Apache下的專案
2014年飛速發展,成為Apache的頂級專案
2015年在國內興起,代替mr,hive,storm等
作者:辛湜(shi)
e) Spark和Hive:
Spark優點:
i. 速度快
ii. Spark SQL支援大量不同的資料來源
f) Spark 和Storm
i. 計算模型不一樣
ii. Spark吞吐量大
g) 特點:快,易用,通用,相容性
h) spark執行模式
i. local(本地)
ii. standalone(叢集)
iii. on yarn(由 yarn作為資源排程Spark負責任務排程和計算)
iv. on mesos(由mesos作為資源排程S)
v. on cloud()
i) 配置步驟
=======================on yarn====================
【說明】
1. spark任務執行在yarn上,由yarn來進行資源排程和管理,spark只負責任務的排程 和計算
2. 不需要配置和啟動spark叢集
3. 只需要在提交任務的節點上安裝並配置spark on yarn 模式
4. 必須找一臺節點安裝spark
5. 步驟:
i. 安裝配置JDK
ii. vi spark-env.sh
1. export JAVA_HOME=/opt/modules/jdk1.7_6.0
2. export HADOOP_CONF_DIR = /opt/modules/hadoop/etc/hadoop
iii. 測試spark on yarn 模式是否安裝成功
iv. 網路測試:
=====================sdandalone模式==============
【說明】
1. spark執行在spark 叢集上,由spark進行資源排程管理,同時還負責任務的排程和 計算
2. 需要配置和啟動spark叢集
3. 步驟:
i. 安裝配置JDK
ii. 上傳並解壓Spark
iii. 建立軟連線 ln -s spark spark 或者修改名稱
iv. 配置環境變數
v. 安裝配置Spark,修改spark配置檔案(spark-env.sh, slaves)
1. vi spark-env.sh
a) export JAVA_HOME=/opt/modules/jdk(jdk位置)
b) export SPARK_MASTER_IP=hadoop-yarn1.beicai.com
c) export SPARK_MASTER_PORT=7077
2. vi slaves(用於指定在哪些節點上啟動worker)
a) hadoop-yarn2.beicai.com
hadoop-yarn3.beicai.com
vi. 將spark傳送給其他主機
vii. 啟動
/opt/modules/spark/bin/start-all.sh
vii. 檢視SparkUI介面:http://hadoop-yarn1.beicai.com:8080
4.
j)
一、Spark原理
1、Spark的執行原理
i、分散式
Ii、主要基於記憶體(少數情況基於磁碟)
Iii、迭代式計算
2、Spark 計算模式 VS MapReduce 計算模式對比
Mr這種計算模型比較固定,只有兩種階段,map階段和reduce階段,兩個階段結束 後,任務就結束了,這意味著我們的操作很有限,只能在map階段和reduce階段, 也同時意味著可能需要多個mr任務才能處理完這個job
Spark 是迭代式計算,一個階段結束後,後面可以有多個階段,直至任務計算完 成,也就意味著我們可以做很多的操作,這就是Spark計算模型比mr 強大的地方
三、什麼是Spark RDD?
1、什麼是RDD?
彈性的,分散式的,資料集
(RDD在邏輯上可以看出來是代表一個HDFS上的檔案,他分為多個分割槽,散落 在Spark的多個節點上)
3、RDD----彈性
當RDD的某個分割槽的資料儲存到某個節點上,當這個節點的記憶體有限,儲存不了這個 分割槽的全部資料時,Spark就會有選擇性的將部分資料儲存到硬碟上,例如:當worker 的記憶體只能儲存20w條資料時,但是RDD的這個分割槽有30w條資料,這時候Spark就 會將多餘的10w條資料,儲存到硬碟上去。Spark的這種有選擇性的在記憶體和硬碟之間的權衡機制就是RDD的彈性特點所在
4、Spark的容錯性
RDD最重要的特性就是,提供了容錯性,可以自動的從失敗的節點上恢復過來,即如 果某個節點上的RDD partition(資料),因為節點的故障丟了,那麼RDD會自動的通過 自己的資料來源重新計算該partition,這一切對使用者來說是透明的
2、Spark的開發型別
(1)、核心開發:離線批處理 / 演示性的互動式資料處理
(2)、SQL查詢:底層都是RDD和計算操作
(3)、底層都是RDD和計算操作
(4)、機器學習
(5)、圖計算
3、Spark 核心開發(Spark-core == Spark-RDD)步驟
(1)、建立初始的RDD
(2)、對初始的RDD進行轉換操作形成新的RDD,然後對新的RDD再進行操作,直 至操作計算完成
(3)、將最後的RDD的資料儲存到某種介質中(hive、hdfs,MySQL、hbase...)
五、Spark原理
Driver,Master,Worker,Executor,Task各個節點之間的聯絡
Spark中的各節點的作用:
1、driver的作用:
(1)、 向master進行任務的註冊
(2)、構建執行任務的基本環境
(3)、接受該任務的executor的反向註冊
(4)、向屬於該任務的executor分配任務
2、什麼是driver?
我們編寫的程式打成jar包後,然後找一臺能夠連線spark叢集的節點做任務的driver,具體的表現為SparkSubmit
3、Master的作用:
(1)、監控叢集;
(2)、動態感知worker的上下線;
(3)、接受driver端註冊請求;
(4)、任務資源的排程
4、Worker的作用:
(1)、定時向master彙報狀態;
(2)、接受master資源排程命令,進行資源的排程
(3)、啟動任務的容器Executor
5、Executor的作用:
(1)、儲存計算的RDD分割槽資料;
(2)、向Driver反向註冊;
(3)、接受Driver端傳送來的任務Task,作用在RDD上進行執行
Spark 程式設計的流程:
1、我們編寫的程式打包成jar包,然後呼叫Spark-Submit 指令碼做任務的提交
2、啟動driver做任務的初始化
3、Driver會將任務極其引數(core,memory,driver相關的引數)進行封裝成ApplicationDescript通過taskSchedulerImpl 提交給Master
4、Master接受到driver端註冊任務請求時,會將請求引數進行解析,並封裝成APP,然後進行持久化,並且加入到其任務佇列中的waitingAPPs
5、當輪到咱們提交的任務執行時,master會呼叫schedule()這個方法,做任務資源排程
6、Master將排程好的資源封裝成launchExecutor,傳送給指定的worker
7、Worker接收到傳送來的launchExecutor時,會將其解析並封裝成ExecutorRunner,然後呼叫start方法,啟動Executor
8、Executor啟動後,會向任務的Driver進行反向註冊
9、當屬於這個任務的所有executor啟動成功並反向註冊完之後,driver會結束SparkContext物件的初始化
10、當sc 初始化成功後,意味著執行任務的基本環境已經準備好了,driver會繼續執行我們編寫好的程式碼
11、開始註冊初始的RDD,並且不斷的進行轉換操作,當觸發了一個action運算元時,意味著觸發了一個job,此時driver就會將RDD之間的依賴關係劃分成一個一個的stage,並將stage封裝成taskset,然後將taskset中的每個task進行序列化,封裝成launchtask,傳送給指定的executor執行
12、Executor接受到driver傳送過來的任務task,會對task進行反序列化,然後將對應的運算元(flatmap,map,reduceByKey。。。。)作用在RDD分割槽上
六、RDD詳解
1、什麼是RDD?
RDD(Resilient Disttibuted Dataset)叫做彈性的分散式的資料集,是spark中最基本的資料抽象,它代表一個不可變,可分割槽,裡面的元素可平行計算的集合
2、RDD的特點:
自動容錯
位置感知性排程
伸縮性
3、RDD的屬性:
(1)、一組分片(partition),即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度,使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值,預設值就是程式所分配到的CPU Core的數目
(2)、一個計算每個分割槽的函式。Spark中RDD的計算是以分片為單位的,每個RDD都會實現computer函式以達到這個目的。Computer函式會對迭代器進行復合,不需要儲存每次計算的結果。
(3)、RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。
(4)、一個partition,即RDD的分片函式。當前Spark中實現了兩種型別的分片函式,一個是基於hashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於key-value的RDD,才會有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函式不但決定了RDD本身的分片數量,也決定了partition RDD Shuffle輸出時的分片數量。
(5)、一個列表,儲存存取每個Partition的優先位置(preferred location)。對於一個HDFD檔案來說。這個列表儲存的就是每個Partition所在的快的位置。按照“移動資料不如移動計算”的理念。Spark在進行任務排程的時候,會盡可能的將計算任務分配到所要處理資料塊的儲存位置。
4、RDD的建立:
進行Spark核心程式設計時,首先要做的事就是建立一個初始的RDD。Spark Core提供了三種建立RDD的方式:
(1)、使用程式中的集合建立RDD (呼叫parallelize()方法)
(2)、使用本地檔案建立RDD (呼叫textFile()方法)
(3)、使用HDFD檔案建立RDD (呼叫textFile()方法)
七、運算元
1、什麼是運算元?
是RDD中定義的作用在每一個RDD分片上的函式,可以對RDD中的資料進行轉換 和操作
2、RDD運算元的分類
(1)、Transformation運算元,這類運算元變換不觸發提交作業(特點就是lazy特性)
返回的是一個RDD
(2)、Action運算元,這類運算元會觸發SparkContext提交作業(觸發一個spark job的執行,從而觸發這個action之前所有的transformation的執行)
返回的是一個spark物件
3、常用的Transformation運算元
八、RDD分割槽排序
I、分割槽
兩種實現方式:coalesce 和 repartition(底層呼叫coalesce)
coalesce(numPartitons,isShuffle)
第一個引數是重分割槽後的數量,第二個引數是是否進行shuffle
如果原來有N個分割槽,重分割槽後有M個分割槽
如果 M > N ,必須將第二引數設定為true(也就是進行shuffle),等價於 repartition(numPartitons) 如果是false將不起作用
如果M < N
100-->10 重分割槽後的分割槽數比原來的小的多,那麼久需要使用shuffle,也即是設定為true
100-->90 重分割槽後的分割槽數和原來的差不多的,那麼就不需要使用shuffle,也就是設定為false
II、排序
sortBy(x => x) 這個運算元中帶有隱式轉換引數
x 能夠排序(比較大小),那麼這個類就必須有比較大小的功能,也就是實現了compareTo 或者compare
實現二次排序有兩種方法:
1、繼承Comparable 介面 或者 Ordered
2、隱式轉換:可以定義隱式轉換函式(Ordered)或者隱式轉換值(Ordering)
九、自定義分割槽
自定義分割槽
要求:按照key將對應的value輸出到指定的分割槽中
解釋:自定義一個自定義分割槽類,繼承partitioner,實現他的兩個方法
1、numPartitions
2、getPartition
具體的功能根據專案的要求自定義實現,然後呼叫partitionBy方法,new出自定義的類,傳入引數即可
九、RDD持久化原理
1、持久化場景:對於一個rdd會被多次引用到,並且這個rdd計算過程複雜,計算時間特變耗時
2、如何進行持久化,呼叫rdd.persist方法或cache方法,cache方法底層就是呼叫persist方法
******************persist(StorageLevel.MEMORY_ONLY)*******************
如果對RDD做持久化,預設持久化級別是storageLevel.MEMORY_ONLY ,也就是持久化到記憶體中去,這種持久化級別是效率最快的,但是由於是純Java 物件,儲存到記憶體中,那麼記憶體可能儲存的數量就會較少
***************persist(StorageLevel.MEMORY_ONLY_SER)****************
如果當我們叢集資源有限時,那麼我們可以採用MEMORY_ONLY_SER,也就是將Java物件進行序列化之後持久到記憶體中去,這種持久化的好處是能夠持久化更多的資料到記憶體中,但是由於在持久化時需要序列化,取出來之後又需要反序列化這一過程,這個過程會消耗CPU計算資源,效能相對於MEMORY_ONLY 這種持久化級別來說稍微弱點,但是還是比較高效的
3、如何選擇RDD持久化策略?
Spark提供的多種持久化級別,主要是為了在CPU和記憶體消耗之間進行取捨,下面是一些通用的持久化級別的選擇建議:
1)、優先使用MEMORY_ONLY,如果可以快取所有資料的話,那麼就使用這種策略,因為純記憶體速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作
2)、如果MEMORY_ONLY策略,無法儲存所有資料的話,那麼使用MEMORY_ONLY_SER,將資料進行序列化儲存,純記憶體操作還是非常快的,只是要消耗CPU進行反序列化
3)、如果需要進行快速的失敗恢復,那麼就選擇帶字尾為_2的策略,進行資料的備份,這樣在失敗時,就不需要重新計算了
4、能不使用DISK相關的策略,就不要使用,有的時候,從磁碟讀取資料,還不如重新計算一次
十一、共享變數
1、共享變數分為兩種:廣播變數 和 累加器
廣播變數(broadcast)
2、日常所遇問題
因為每個task都需要拷貝這樣的一個副本到executor去執行,那麼我們可以想象一下,如果有1000 個task在某個worker上執行,而這個副本有100M,那麼意味著我們需要拷貝100G的資料都到某個worker上執行,這樣的話會大大消耗我們的網路流量,同時會加大executor的記憶體消耗,從而增加了我們spark作業的執行時間,大大降低了spark作業的執行效率,增加了作業失敗的概率
3、如何解決以上問題,也就是說什麼時候使用廣播變數?
當RDD引用到了一個外部變數並且這個外部變數資料量不小,同時這個RDD對應的task數量特別多,那麼此時使用廣播共享變數再合適不過了
我們可以將這種大的外部變數做成廣播變數,外部變數做成廣播變數的時候,那麼每個executor的記憶體中只會有一個外部變數,而這個副本針對所有的task都是共享的,這樣的話就減少了網路流量消耗,降低了executor的記憶體消耗,提高了spark作業執行效率和縮短了執行時間,同時降低了作業失敗的概率
4、廣播變數的使用流程:
1)、某個executor的第一個task先執行,首先會從自己的blockManager中查詢外部變數,如果沒有就從鄰居的executor的blockManager的記憶體中獲取這個外部變數,如果還是獲取不到,就從driver端獲取,拷貝這個外部變數到本地的executor的blockManager
2)、當這個executor的其他task執行時,就不需要從外面獲取這個外部變數的副本,直接從本地的blockManager中獲取即可
5、如何獲取廣播變數的值?
可以直接呼叫廣播變數的value() 這個方法即可
【注意】廣播變數是隻讀的,不可寫
累加器(Accumulator)
Spark提供的Accumulator ,主要用於多個節點對一個變數進行共享性的操作,Accumulator只是提供了累加的功能。但是卻給我們提供了多個task對一個變數並行操作的功能,但是task只能對Accumulator進行累加操作
【注意】task只能對Accumulator進行類加操作,只有Driver程式可以讀取Accumulator的值
RDD分割槽和容錯機制講解1、RDD 的Lineage血統
RDD只支援粗粒度轉換,即在大量記錄上執行的單個操作,將建立RDD的一系列Lineage(血統)記錄下來。以便恢復丟失的分割槽
2、RDD的依賴關係
RDD和它的父RDD的關係有兩種不同的型別:
1)、窄依賴(一對一,多對一)
形象的比喻:獨生子女
2)、寬依賴(多對多)
形象的比喻:超生
註釋:劃分stage的依據就是寬依賴,也就是RDD之間是否有shuffle,shuffle過程就是一個寬依賴過程,shuffle之前的tasks就屬於一個stage,shuffle之後的也屬於一個stage,shuffle之前和之後的操作都是窄依賴
【注意】shuffle過程分為:shuffle Write過程 和 shuffle read過程
4、DAG的生成(有向無環圖)和任務的劃分
DAG(Directed Acyclic Graph)叫做有向無環圖(有方向無迴圈的圖)
5、一個wordCount過程會產生多少個RDD?
至少會產生五個RDD,
第一個,從HDFS中載入後得到一個RDD(即使用sc.textFile()運算元),即HadoopRDD
在sc.textFile()過程中還會產生一個RDD(呼叫map運算元),產生一個MapPartitionRDD
第二個,使用flatMap運算元,得到一個MapPartitionRDD
第三個,使用map運算元,得到一個MapPartitionRDD
第四個,使用reduceByKey運算元,也就是在經過了shuffle過程後又會得到一個shuffledRDD
第五個,使用saveAsTextFile運算元,再產生一個MapPartitionRDD
spark程式提交流程講解
Spark任務簡介:
Spark-submit--->SparkSubmit-->main-->submit-->doRunMain-->RunMain-->通過反射建立我們編寫的主類的例項物件,呼叫main方法-->開始執行我們編寫的程式碼-->初始化SparkContext物件-->建立初始的RDD-->觸發action運算元-->提交job-->worker執行任務-->任務結束
Spark任務詳解:
1)、將我們編寫的程式打成jar包
2)、呼叫spark-submit指令碼提交任務到叢集上執行
3)、執行sparkSubmit的main方法,在這個方法中通過反射的方式建立我們編寫的主類的例項物件,然後呼叫main方法,開始執行我們的程式碼(注意,我們的spark程式中的driver就執行在sparkSubmit程序中)
4)、當代碼執行到建立SparkContext物件時,那就開始初始化SparkContext物件了
5)、在初始化SparkContext物件的時候,會建立兩個特別重要的物件,分別是:DAGScheduler
和TaskScheduler
【DAGScheduler的作用】將RDD的依賴切分成一個一個的stage,然後將stage作為taskSet提交給DriverActor
6)、在構建taskScheduler的同時,會建立兩個非常重要的物件,分別是DriverActor和ClientActor
【clientActor的作用】向master註冊使用者提交的任務
【DriverActor的作用】接受executor的反向註冊,將任務提交給executor
7)、當clientActor啟動後,會將使用者提交的任務和相關的引數封裝到ApplicationDescription物件中,然後提交給master進行任務的註冊
8)、當master接受到clientActor提交的任務請求時,會將請求引數進行解析,並封裝成Application,然後將其持久化,然後將其加入到任務佇列waitingApps中
9)、當輪到我們提交的任務執行時,就開始呼叫schedule(),進行任務資源的排程
10)、master將排程好的資源封裝到launchExecutor中傳送給指定的worker
11)、worker接受到Maseter傳送來的launchExecutor時,會將其解壓並封裝到ExecutorRunner中,然後呼叫這個物件的start(), 啟動Executor
12)、Executor啟動後會向DriverActor進行反向註冊
13)、driverActor會發送註冊成功的訊息給Executor
14)、Executor接受到DriverActor註冊成功的訊息後會建立一個執行緒池,用於執行DriverActor傳送過來的task任務
15)、當屬於這個任務的所有的Executor啟動並反向註冊成功後,就意味著執行這個任務的環境已經準備好了,driver會結束SparkContext物件的初始化,也就意味著new SparkContext這句程式碼執行完成
16)、當初始化sc成功後,driver端就會繼續執行我們編寫的程式碼,然後開始建立初始的RDD,然後進行一系列轉換操作,當遇到一個action