1. 程式人生 > >spark基本概念及入門

spark基本概念及入門

spark

spark背景

什麼是spark

Spark是一種快速、通用、可擴充套件的大資料分析引擎,2009年誕生於加州大學伯克利分校AMPLab,2010年開源,2013年6月成為Apache孵化專案,2014年2月成為Apache頂級專案。目前,Spark生態系統已經發展成為一個包含多個子專案的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子專案,Spark是基於記憶體計算的大資料平行計算框架。Spark基於記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了高容錯性和高可伸縮性,允許使用者將Spark部署在大量廉價硬體之上,形成叢集。

Spark與Hadoop

Spark是一個計算框架,而Hadoop中包含計算框架MapReduce和分散式檔案系統HDFS,Hadoop更廣泛地說還包括在其生態系統上的其他系統.

為什麼使用Spark?

Hadoop的MapReduce計算模型存在問題:
Hadoop的MapReduce的核心是Shuffle(洗牌).在整個Shuffle的過程中,至少產生6次I/O流.基於MapReduce計算引擎通常會將結果輸出到次盤上,進行儲存和容錯.另外,當一些查詢(如:hive)翻譯到MapReduce任務是,往往會產生多個Stage,而這些Stage有依賴底層檔案系統來儲存每一個Stage的輸出結果,而I/O的效率往往較低,從而影響MapReduce的執行速度.

Spark的特點: 快, 易用, 通用,相容性

  • 快:與Hadoop的MapReduce相比,Spark基於記憶體的運算要快100倍以上,基於硬碟的運算也要快10倍以上。Spark實現了高效的DAG執行引擎,可以通過基於記憶體來高效處理資料流。
  • 易用:Spark支援Java、Python和Scala的API,還支援超過80種高階演算法,使使用者可以快速構建不同的應用。而且Spark支援互動式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark叢集來驗證解決問題的方法。
  • 通用:Spark提供了統一的解決方案。Spark可以用於批處理、互動式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不同型別的處理都可以在同一個應用中無縫使用。Spark統一的解決方案非常具有吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減少開發和維護的人力成本和部署平臺的物力成本。
  • 相容性:Spark 可以非常方便地與其他的開源產品進行融合。比如,Spark 可以使用Hadoop 的 YARN 和 Apache Mesos 作為它的資源管理和排程器.並且可以處理所有 Hadoop 支援的資料,包括 HDFS、HBase 和 Cassandra 等。這對於已經部署Hadoop 叢集的使用者特別重要,因為不需要做任何資料遷移就可以使用 Spark 的強大處理能力。Spark 也可以不依賴於第三方的資源管理和排程器,它實現了Standalone 作為其內建的資源管理和排程框架,這樣進一步降低了 Spark 的使用門檻,使得所有人都可以非常容易地部署和使用 Spark。此外,Spark 還提供了在EC2 上部Standalone 的 Spark 叢集的工具。

Spark的生態系統

  • 1.Spark Streaming:
    Spark Streaming基於微批量方式的計算和處理,可以用於處理實時的流資料.它使用DStream,簡單來說是一個彈性分散式資料集(RDD)系列,處理實時資料.資料可以從Kafka,Flume,Kinesis或TCP套接字等眾多來源獲取,並且可以使用由高階函式(如 map,reduce,join 和 window)開發的複雜演算法進行流資料處理。最後,處理後的資料可以被推送到檔案系統,資料庫和實時儀表板。
  • 2.Spark SQL
    SPark SQL可以通過JDBC API將Spark資料集暴露出去,而且還可以用傳統的BI和視覺化工具在Spark資料上執行類似SQL的查詢,使用者哈可以用Spark SQL對不同格式的資料(如Json, Parque以及資料庫等)執行ETl,將其轉化,然後暴露特定的查詢.
  • 3.Spark MLlib
    MLlib是一個可擴充套件的Spark機器學習庫,由通用的學習演算法和工具組成,包括二元分類、線性迴歸、聚類、協同過濾、梯度下降以及底層優化原語。
  • 4.Spark Graphx:
    GraphX是用於圖計算和並行圖計算的新的(alpha)Spark API。通過引入彈性分散式屬性圖(Resilient Distributed Property Graph),一種頂點和邊都帶有屬性的有向多重圖,擴充套件了Spark RDD。為了支援圖計算,GraphX暴露了一個基礎操作符集合(如subgraph,joinVertices和aggregateMessages)和一個經過優化的Pregel API變體。此外,GraphX還包括一個持續增長的用於簡化圖分析任務的圖演算法和構建器集合。
  • 5.Tachyon
    Tachyon是一個以記憶體為中心的分散式檔案系統,能夠提供記憶體級別速度的跨叢集框架(如Spark和mapReduce)的可信檔案共享.它將工作集檔案快取在記憶體中,從而避免到磁碟中載入需要經常讀取的資料集,通過這一機制,不同的作業/查詢和框架可以記憶體級的速度訪問快取檔案.
    此外,還有一些用於與其他產品整合的介面卡,如Cassandra(Spark Cassandra 聯結器)和R(SparkR)。Cassandra Connector可用於訪問儲存在Cassandra資料庫中的資料並在這些資料上執行資料分析。
  • 6.Mesos
    Mesos是一個資源管理框架
    提供類似於YARN的功能
    使用者可以在其中外掛式地執行Spark,MapReduce,Tez等計算框架任務
    Mesos對資源和任務進行隔離,並實現高效的資源任務排程
  • 7.BlinkDB
    BlinkDB是一個用於在海量資料上進行互動式SQL的近似查詢引擎
    允許使用者通過查詢準確性和查詢時間之間做出權衡,完成近似查詢
    核心思想:通過一個自適應優化框架,隨著時間的推移,從原始資料建立並維護一組多維樣本,通過一個動態樣本選擇策略,選擇一個適當大小的示例,然後基於查詢的準確性和響應時間滿足使用者查詢需求

除了這些庫意外,還有一些其他的庫,如Blink和Tachyon.
BlinkDB是一個近似查詢 引擎,用於海量資料執行互動式SQL查詢.BlinkDB可以通過犧牲資料精度來提升查詢響應時間.通過在資料樣本上執行查詢並展示包含有意義的錯誤線註解的結果,操作大資料集合.

Spark架構採用了分散式計算中的Master-Slave模型。Master是對應叢集中的含有Master程序的節點,Slave是叢集中含有Worker程序的節點。Master作為整個叢集的控制器,負責整個叢集的正常執行;Worker相當於是計算節點,接收主節點命令與進行狀態彙報;Executor負責任務的執行;Client作為使用者的客戶端負責提交應用,Driver負責控制一個應用的執行.
Spark叢集部署後,需要在主節點和從節點分別啟動master程序和Worker程序,對整個叢集進行控制.在一個Spark應用的執行程式中.Driver和Worker是兩個重要的角色.Driver程式是應用邏輯執行的起點,負責作業的排程,即Task任務的釋出,而多個Worker用來管理計算節點和建立Executor並行處理任務.在執行階段,Driver會將Task和Task所依賴的file和jar序列化後傳遞給對應的Worker機器.同時Executor對相應資料分割槽的任務進行處理.

Sparkde架構中的基本元件:

  • ClusterManager:在standlone模式中即為Master(主節點),控制整個叢集.監控Worker.在Yarn模式中為資源管理器.
  • Worker:從節點,負責控制計算節點,啟動Ex而粗投入或Driver
  • NodeManager:負責計算節點的控制。
  • Driver:執行Application的main() 函式並建立SparkContext
  • Executor: 執行器,在worker node上執行任務元件,用於啟動執行緒執行任務.每個Application擁有獨立的一組Executors
  • SparkContext: 整個應用的上下文,監控應用的生命週期
  • RDD:彈性分散式集合,spark的基本計算單元,一組RDD可形成執行的有向無環圖RDD Graph
  • DAG Scheduler: 根據作業(Job)構建基於Stage的DAG,並交給Stage給TaskScheduler
  • TaskScheduler:將任務(Task)分發給Executor執行
  • SparkEnv:執行緒級別的上下文,儲存執行時的重要元件的引用。SparkEnv內建立幷包含如下一些重要元件的引用。
  • MapOutPutTracker:負責Shuffle元資訊的儲存。
  • BroadcastManager:負責廣播變數的控制與元資訊的儲存。
  • BlockManager:負責儲存管理、建立和查詢塊。
  • MetricsSystem:監控執行時效能指標資訊。
  • SparkConf:負責儲存配置資訊。
  • Spark的整體流程:client提交應用,Master找到一個Worker啟動Driver,Driver向Master或者向資源管理器申請資源,之後將應用轉化為RDD Graph,再由DAGScheduler將RDD Graph轉化為Stage的有向無環圖提交給TaskScheduler,由TaskScheduler提交任務給Executor執行。在任務執行的過程中,其他元件協同工作,確保整個應用順利執行。

搭建spark叢集

安裝java環境,spark自動會把scala SDK打包到spark中無需安裝scala環境

配置spark

$ cp $SPARK_HOME/conf/spark-env.sh.template spark-env.sh
$ vim $SPARK_HOME/conf/spark-env.sh

新增
export JAVA_HOME=/usr/java/jdk1.8.0_191

#export SPARK_MASTER_IP=node-1
#export SPARK_MASTER_PORT=7077
$ cp $SPARK_HOME/conf/slaves.template slaves

$ vi slaves
# 在該檔案中新增子節點所在的位置(Worker節點)
node-2
node-3
node-4

啟動spark叢集

$SPARK_HOME/sbin/start-master.sh

$SPARK_HOME/sbin/start-slaves.sh

啟動後執行jps命令,主節點上有Master程序,其他子節點上有Work進行,登入Spark管理介面檢視叢集狀態(主節點):http://node-1:8080/

到此為止,Spark叢集安裝完畢,但是有一個很大的問題,那就是Master節點存在單點故障,要解決此問題,就要藉助zookeeper,並且啟動至少兩個Master節點來實現高可靠,配置方式比較簡單:

Spark叢集規劃:node-1,node-2是Master;node-3,node-4,node-5是Worker

安裝配置zk叢集,並啟動zk叢集

停止spark所有服務,修改配置檔案spark-env.sh,在該配置檔案中刪掉SPARK_MASTER_IP並新增如下配置

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk1,zk2,zk3 -Dspark.deploy.zookeeper.dir=/spark"

1.在node1節點上修改slaves配置檔案內容指定worker節點

2.在node1上執行$SPARK_HOME/sbin/start-all.sh,然後在node2上執行$SPARK_HOME/sbin/start-master.sh啟動第二個Master

執行第一個spark程式

$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://localhost:7077 --executor-memory 1G --total-executor-cores 1 $SPARK_HOME/examples/jars/spark-examples_2.11-2.2.2.jar 100

spark Shell

spark-shell是Spark自帶的互動式Shell程式,方便使用者進行互動式程式設計,使用者可以在該命令列下用scala編寫spark程式。

$SPARK_HOME/bin/spark-shell \

--master spark://localhost:7077 \

--executor-memory 2g \

--total-executor-cores 2

引數說明:

--master spark://localhost:7077 指定Master的地址

--executor-memory 2g 指定每個worker可用記憶體為2G

--total-executor-cores 2 指定整個叢集使用的cup核數為2個

注意:

如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執行spark shell中的程式,其實是啟動了spark的local模式,該模式僅在本機啟動一個程序,沒有與叢集建立聯絡。Spark Shell中已經預設將SparkContext類初始化為物件sc。使用者程式碼如果需要用到,則直接應用sc即可

spark shell中編寫WordCount

在spark shell中用scala語言編寫spark程式

sc.textFile("file:///root/tmp/words.dta").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("file:///root/tmp/out")

說明:

sc是SparkContext物件,該物件時提交spark程式的入口

textFile("file:///root/tmp/words.dta") 從本地檔案中讀取資料

flatMap(_.split(" ")) 先map在壓平

map((_,1)) 將單詞和1構成元組

reduceByKey(+) 按照key進行reduce,並將value累加

saveAsTextFile("file:///root/tmp/out") 將結果寫入到指定位置

spark RDD

RDD概述

什麼是RDD

RDD(Resilient Distributed Dataset)叫做分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。RDD具有資料流模型的特點:自動容錯、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。

RDD的屬性

  • 一組分片(Partition),即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。

  • 一個計算每個分割槽的函式。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。

  • RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。

  • 一個Partitioner,即RDD的分片函式。當前Spark中實現了兩種型別的分片函式,一個是基於雜湊的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

  • 一個列表,儲存存取每個Partition的優先位置(preferred location)。對於一個HDFS檔案來說,這個列表儲存的就是每個Partition所在的塊的位置。按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置。

建立RDD

  • 由一個已經存在的Scala集合建立。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  • 由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile("hdfs://localhost:9000/wc/words.txt")

RDD程式設計模型

Transformation

RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。

常用的Transformation:

轉換 含義
map(func) 返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成
filter(func) 返回一個新的RDD,該RDD由經過func函式計算後返回值為true的輸入元素組成
flatMap(func) 類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func) 類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上執行時,func的函式型別必須是 (Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據fraction指定的比例對資料進行取樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset) 對源RDD和引數RDD求並集後返回一個新的RDD
intersection(otherDataset) 對源RDD和引數RDD求交集後返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重後返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似,但是更靈活
join(otherDataset, [numTasks]) 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個(K,(Iterable
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

Action

動作 含義
reduce(func) 通過func函式聚集RDD中的所有元素,這個功能必須是課交換且可並聯的
collect() 在驅動程式中,以陣列的形式返回資料集的所有元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(類似於take(1))
take(n) 返回一個由資料集的前n個元素組成的陣列
takeSample(withReplacement,num, [seed]) 返回一個數組,該陣列由從資料集中隨機取樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path) 將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對於每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文字
saveAsSequenceFile(path) 將資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。
saveAsObjectFile(path)
countByKey() 針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func) 在資料集的每一個元素上,執行函式func進行更新。

RDD的依賴關係

RDD和它依賴的父RDD(s)的關係有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

shuffle重要的依據:父RDD的一個分割槽的資料,要給子RDD的多個分割槽

窄依賴

窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用

總結:窄依賴我們形象的比喻為獨生子女

寬依賴

寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition

總結:窄依賴我們形象的比喻為超生

Lineage

RDD只支援粗粒度轉換,即在大量記錄上執行的單個操作。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分割槽。RDD的Lineage會記錄RDD的元資料資訊和轉換行為,當該RDD的部分分割槽資料丟失時,它可以根據這些資訊來重新運算和恢復丟失的資料分割槽。

RDD的快取

RDD通過persist方法或cache方法可以將前面的計算結果快取,但是並不是這兩個方法被呼叫時立即快取,而是觸發後面的action時,該RDD將會被快取在計算節點的記憶體中,並供後面重用。

cache最終也是呼叫了persist方法,預設的儲存級別都是僅在記憶體儲存一份,Spark的儲存級別還有好多種,儲存級別在object StorageLevel中定義的。