1. 程式人生 > >Spark(二) :基本架構解析

Spark(二) :基本架構解析

1,spark基礎及體系架構

1.1 spark why?

  Apache Spark是一個圍繞速度、易用性和複雜分析構建的大資料處理框架,最初在2009年由加州大學伯克利分校的AMPLab開發,並於2010年成為Apache的開源專案之一,與Hadoop和Storm等其他大資料和MapReduce技術相比,Spark有如下優勢

  1. Spark提供了一個全面、統一的框架用於管理各種有著不同性質(文字資料、圖表資料等)的資料集和資料來源(批量資料或實時的流資料)的大資料處理的需求
  2. 官方資料介紹Spark可以將Hadoop叢集中的應用在記憶體中的執行速度提升100倍,甚至能夠將應用在磁碟上的執行速度提升10倍

Spark  VS  MapReduce 迭代計算:

有多MapReduce任務串聯時,依HDFS儲存中間結果的輸出。MapReduce在處理複雜DAG時會帶來大量的資料copy、序列化和磁碟I/O開銷;

spark處理速度則非常快:

  1. Spark儘可能減少中間結果寫入磁碟
  2. 儘可能減少不必要Sort/Shuffle
  3. 反覆用到的資料進Cache
  4. 對於DAG進行高度優化
  5. 劃分不同Stage
  6. 使用延遲計算技術

1.2  架構及生態

  • 通常當需要處理的資料量超過了單機尺度(比如我們的計算機有4GB的記憶體,而我們需要處理100GB以上的資料)這時我們可以選擇spark叢集進行計算,有時我們可能需要處理的資料量並不大,但是計算很複雜,需要大量的時間,這時我們也可以選擇利用spark叢集強大的計算資源,並行化地計算,其架構示意圖
    如下:

  • Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構建在RDD和Spark Core之上的
  • Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行互動的API。每個資料庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。
  • Spark Streaming:對實時資料流進行處理和控制。Spark Streaming允許程式能夠像普通RDD一樣處理實時資料
  • MLlib:一個常用機器學習演算法庫,演算法被實現為對RDD的Spark操作。這個庫包含可擴充套件的學習演算法,比如分類、迴歸等需要對大量資料集進行迭代的操作。
  • GraphX:控制圖、並行圖操作和計算的一組演算法和工具的集合。GraphX擴充套件了RDD API,包含控制圖、建立子圖、訪問路徑上所有頂點的操作

Spark架構的組成圖如下

  • Cluster Manager:在standalone模式中即為Master主節點,控制整個叢集,監控worker。在YARN模式中為資源管理器
  • Worker節點:從節點,負責控制計算節點,啟動Executor或者Driver。
  • Driver: 執行Application 的main()函式
  • Executor:執行器,是為某個Application執行在worker node上的一個程序

1.3 spark執行流程

術語解釋

  • Application: Appliction都是指使用者編寫的Spark應用程式,其中包括一個Driver功能的程式碼和分佈在叢集中多個節點上執行的Executor程式碼
  • Driver:  Spark中的Driver即執行上述Application的main函式並建立SparkContext,建立SparkContext的目的是為了準備Spark應用程式的執行環境,在Spark中有SparkContext負責與ClusterManager通訊,進行資源申請、任務的分配和監控等,當Executor部分執行完畢後,Driver同時負責將SparkContext關閉,通常用SparkContext代表Driver
  • Executor:  某個Application執行在worker節點上的一個程序,  該程序負責執行某些Task, 並且負責將資料存到記憶體或磁碟上,每個Application都有各自獨立的一批Executor, 在Spark on Yarn模式下,其程序名稱為CoarseGrainedExecutor Backend。一個CoarseGrainedExecutor Backend有且僅有一個Executor物件, 負責將Task包裝成taskRunner,並從執行緒池中抽取一個空閒執行緒執行Task, 這個每一個oarseGrainedExecutor Backend能並行執行Task的數量取決與分配給它的cpu個數
  • Cluter Manager:指的是在叢集上獲取資源的外部服務。目前有三種類型
  1. Standalon : spark原生的資源管理,由Master負責資源的分配
  2. Apache Mesos:與hadoop MR相容性良好的一種資源排程框架
  3. Hadoop Yarn: 主要是指Yarn中的ResourceManager
  • Worker: 叢集中任何可以執行Application程式碼的節點,在Standalone模式中指的是通過slave檔案配置的Worker節點,在Spark on Yarn模式下就是NoteManager節點
  • Task: 被送到某個Executor上的工作單元,但hadoopMR中的MapTask和ReduceTask概念一樣,是執行Application的基本單位,多個Task組成一個Stage,而Task的排程和管理等是由TaskScheduler負責
  • Job: 包含多個Task組成的平行計算,往往由Spark Action觸發生成, 一個Application中往往會產生多個Job
  • Stage: 每個Job會被拆分成多組Task, 作為一個TaskSet, 其名稱為Stage,Stage的劃分和排程是有DAGScheduler來負責的,Stage有非最終的Stage(Shuffle Map Stage)和最終的Stage(Result Stage)兩種,Stage的邊界就是發生shuffle的地方
  • DAGScheduler: 根據Job構建基於Stage的DAG(Directed Acyclic Graph有向無環圖),並提交Stage給TASkScheduler。 其劃分Stage的依據是RDD之間的依賴的關係找出開銷最小的排程方法,如下圖

  • TASKSedulter: 將TaskSET提交給worker執行,每個Executor執行什麼Task就是在此處分配的. TaskScheduler維護所有TaskSet,當Executor向Driver發生心跳時,TaskScheduler會根據資源剩餘情況分配相應的Task。另外TaskScheduler還維護著所有Task的執行標籤,重試失敗的Task。下圖展示了TaskScheduler的作用

  • 在不同執行模式中任務排程器具體為:
  1. Spark on Standalone模式為TaskScheduler
  2. YARN-Client模式為YarnClientClusterScheduler
  3. YARN-Cluster模式為YarnClusterScheduler
  • 將這些術語串起來的執行層次圖如下:

Job=多個stage,Stage=多個同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency

1.4 RDD及Stage

RDD:

Spark將資料快取在分散式記憶體。彈性分散式資料集。

如何做實現?RDD:

  1. Spark的核心
  2. 分散式記憶體抽象
  3. 提供了一個高度受限的共享記憶體模型
  4. 邏輯上集中但是物理上是儲存在叢集的多臺機器上

RDD特性:

只讀

  1. 通過HDFS或者其它持久化系統建立RDD
  2. 通過transformation將父RDD轉化得到新的RDD
  3. RDD上儲存著前後之間依賴關係

Partition

  1. 基本組成單位,RDD在邏輯上按照Partition分塊
  2. 分佈在各個節點上
  3. 分片數量決定平行計算的粒度
  4. RDD中儲存如何計算每一個分割槽的函式

容錯

  1. 失敗自動重建
  2. 如果發生部分分割槽資料丟失,可以通過依賴關係重 新計算

寬窄依賴及Stage

窄依賴:沒有資料shuffling;所有父RDD中的Partition均會一一對映到子RDDPartition

寬依賴:有資料shuffling ;所有父RDD中的Partition會被切分,根據key的不同劃分到子RDD的Partition中

Stage : 一個Job會被拆分為多組Task,每組Task被稱為一個Stage

 劃分依據: 以shuffle操作作為邊界,遇到一個寬依賴就分一stage

Stage優化:

  1. 對窄依賴可以進行流水(pipeline)優化
  2. 不互相依賴的Stage可以並行執行
  3. 存在依賴的Stage必須在依賴的Stage執行完之後才能執行
  4. Stage並行執行程度取決於資源數

1.5 spark on yarn

 yarn資源排程過程

sprark on yarn:

Yarn

  1. ResourceManager:負責整個叢集資源管理和分配
  2. ApplicationMaster:Yarn中每個Application對應一個AM,負責與ResrouceManager協商獲取資源,並告知NodeManager分配啟Container
  3. NodeManager:每個節點的資源和工作管理員,負責啟Container,並監 視資源使用情況
  4. Container:資源抽象

Spark

  1. Application:使用者自己編寫的Spark程式
  2. Driver:執行Application的main函式並建立SparkContext,和ClusterManager通訊申請資源,任務分配並監控執行情況
  3. ClusterManager:指的是Yarn
  4. DAGScheduler:DAG圖劃分Stage
  5. TaskScheduler:TaskSet分配給具體的Executor

Spark支援三種執行模式 :standalon, yarn-cluster, yarn-client

處理流程:

 

2,spark程式設計模型

2.1 spark程式設計核心思想

   函數語言程式設計特點:函式為一等公民;資料對映;變數不可變;沒有副作用。

2.2 spark對於RDD有四種類型運算元

1)Create

  • SparkContext.textFile()
  • SparkContext.parallelize()

2)Transformation

  • 作用於一個或者多個RDD,出轉RDD。例如:map, filter, groupBy

3)Action

  • 會觸發Spark提交作業,並將果返Driver Program 。例如:reduce, countByKey

4)Cache

  • cache 快取
  • persist 持久化

惰性運算:遇到Action時才會真正的執行

訪問官方文件:https://spark.apache.org/docs/1.6.0/

2.3 Value型別 Transformation 運算元分類

2.3.1Transformation-map

map

  • def map[U](f: (T)  U)(implicit arg0: ClassTag[U]):RDD[U]
  • 生成一個新的RDD,新的RDD中每個元素均有父RDD通 過作用func函式對映變換而來
  • 新的RDD叫做MappedRDD

Example:

  • val rd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)  val rd2 = rd1.map(x => x * 2)
  • rd2.collect()
  • rd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at  parallelize
  • rd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map  res1: Array[Int] = Array(2, 4, 6, 8, 10, 12)

2.3.2 Transformation-mapPartitions
mapPartitions

  • def mapPartitions[U](f: (Iterator[T]) => Iterator[U],  preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]):  RDD[U]
  • 獲取到每個分割槽的迭代器
  • 對每個分割槽中每個元素進行操作

Example

  • val rd1 = sc.parallelize(List("20180101", "20180102", "20180103", "20180104", "20180105","20180106"), 2)
  • val rd2 = rd1.mapPartitions(iter => {val dateFormat = new java.text.SimpleDateFormat("yyyyMMdd")  iter.map(dateStr => dateFormat.parse(dateStr))})
  • rd2.collect()
  • res1: Array[ java.util.Date] = Array(Mon Jan 01 00:00:00 UTC 2018, Tue Jan 02 00:00:00 UTC 2018, Wed Jan 0300:00:00 UTC 2018, Thu Jan 04 00:00:00 UTC 2018, Fri Jan 05 00:00:00 UTC 2018, Sat Jan 06 00:00:00 UTC 2018)

2.3.3 Transformation-flatMap
flatMap

  • def flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]
  • 將RDD中的每個元素通過func轉換為新的元素
  • 進行扁平化:合併所有的集合為一個新集合
  • 新的RDD叫做FlatMappedRDD

Example

  • val rd1 = sc.parallelize(Seq("I have a pen","I have an apple",  "I have a pen","I have a pineapple"), 2)  val rd2 = rd1.map(s => s.split(" "))
  • rd2.collect()
  • val rd3 = rd1.flatMap(s => s.split(" "))  
  • rd3.collect()
  • rd3.partitions
  • res136: Array[Array[String]] = Array(Array(I, have, a, pen), Array(I, have, an, apple), Array(I, have, a, pen), Array(I, have,a, pineapple))
  • res137: Array[String] = Array(I, have, a, pen, I, have, an, apple, I, have, a, pen, I, have, a, pineapple)

2.3.4Transformation-union
union

  • def union(other: RDD[T]): RDD[T]
  • 合併兩個RDD
  • 元素資料型別需要相同,並不進行去重操作

Example

  • val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))  val rdd2 = sc.parallelize(Seq("Banana", "Pineapple"))
  • val rdd3 = sc.parallelize(Seq("Durian"))
  • val rddUnion = rdd1.union(rdd2).union(rdd3)  
  • rddUnion.collect.foreach(println)
  • res1: Array[String] = Array(Apple, Banana, Orange, Banana, Pineapple, Durian)

2.3.5 Transformation-distinct
distinct

  • def distinct(): RDD[T]
  • 對RDD中的元素進行去重操作

Example

  • val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))  val rdd2 = sc.parallelize(Seq("Banana", "Pineapple"))
  • val rdd3 = sc.parallelize(Seq("Durian"))
  • val rddUnion = rdd1.union(rdd2).union(rdd3)
  •  val rddDistinct = rddUnion.distinct()  
  • rddDistinct.collect()
  • res1: Array[String] = Array(Orange, Apple, Banana, Pineapple, Durian)

2.3.6 Transformation-filter
filter

  • def filter(f: (T) ⇒ Boolean): RDD[T]
  • 對RDD元素的資料進行過濾
  • 當滿足f返回值為true時保留元素,否則丟棄

Example

  • val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))
  •  val filteredRDD = rdd1.filter(item => item.length() >= 6)
  •  filteredRDD.collect()
  • res1: Array[String] = Array(Banana, Orange)

2.3.7 Transformation-intersection
interesction

  • def intersection(other: RDD[T]): RDD[T]
  • def intersection(other: RDD[T], numPartitions: Int): RDD[T]
  • def intersection(other: RDD[T], partitioner: Partitioner)(implicit  ord: Ordering[T] = null): RDD[T]
  • 對兩個RDD元素取交集

Example

  • val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))  
  • val rdd2 = sc.parallelize(Seq("Banana", "Pineapple"))
  • val rddIntersection = rdd1.intersection(rdd2)
  • rddIntersection.collect()
  • res1: Array[String] = Array(Banana)

2.4  Key-Value型別 Transformation 運算元分類

2.4.1 Transformation-groupByKey

groupByKey

  • def groupByKey(): RDD[(K, Iterable[V])]
  • def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
  • def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  • 對RDD[Key, Value]按照相同的key進行分組

Example

  • val scoreDetail = sc.parallelize(List(("xiaoming","A"), ("xiaodong","B"),("peter","B"), ("liuhua","C"), ("xiaofeng","A")), 3)  scoreDetail.map(score_info => (score_info._2, score_info._1)).groupByKey().collect().foreach(println(_))
  • scoreDetail: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[110] at parallelize(A,CompactBuffer(xiaoming, xiaofeng))  (B,CompactBuffer(xiaodong, peter))  (C,CompactBuffer(lihua))

2.4.2 Transformation-reduceByKey
reduceByKey

  • def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
  • def reduceByKey(func: (V, V) ⇒ V, numPartitions: Int): RDD[(K, V)]
  • def reduceByKey(partitioner: Partitioner, func: (V, V) ⇒ V): RDD[(K, V)]
  • 對RDD[Key, Value]按照相同的key進行merge操作
  • 可以在shuffle前對當前節點上的結果先進行merge操作

Example

  • val scoreDetail = sc.parallelize(List("A", "B", "B", "D", "B", "D", "E", "A", "E"), 3)  scoreDetail.map(w => (w, 1)).reduceByKey(_ + _).collect()
  • scoreDetail: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[120] at parallelize  res1: Array[(String, Int)] = Array((B,3), (E,2), (A,2), (D,2))

2.4.3 Transformation-join
join

  • def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  • def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
  • def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V,  W))]
  • 對兩個RDD根據key進行連線操作

Example

  • val data1 = sc.parallelize(Array(("A", 1),("b", 2),("c", 3)))
  • val data2 = sc.parallelize(Array(("A", 4),("A", 6),("b", 7),("c", 3),("c", 8)))
  • val result = data1.join(data2)
  • result.collect()
  • data1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD  data2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD  result: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD
  • res1: Array[(String, (Int, Int))] = Array((A,(1,4)), (A,(1,6)), (b,(2,7)), (c,(3,3)), (c,(3,8)))

2.5  Action 運算元分類

2.5.1 Action-count/countByKey/countByValue
count

  • def count(): Long
  • 從RDD中返回元素的個數

countByKey

  • def countByKey(): Map[K, Long]
  • 從RDD[K, V]中返回key出現的次數
  • val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)  
  • c.countByKey
  • res1: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)

countByValue

  • def countByValue(): Map[T, Long]
  • 統計RDD中值出現的次數
  • val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
  • b.countByValue
  • res1: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)

2.5.2 Action-collect/take/takeOrdered/top
collect

  • def collect(): Array[T]
  • 從RDD中返回所有的元素到Driver Program

take

  • def take(num: Int): Array[T]
  • 從RDD中取0到num – 1下標的元素,不排序

takeOrdered

  • def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
  • 從RDD中返按從小到大(預設)返回num個元素

top

  • def top(num: Int)(implicit ord: Ordering[T]): Array[T]
  • 和takeOrdered類似,但是排序順序從大到小

2.5.3 Action-reduce
reduce

  • def reduce(f: (T, T) => T): T
  • 對RDD中的元素進行聚合操作
  • 注意:reduceByKey是Transformation
  • 如果集合為空則會丟擲Exception

Example

  • val item = sc.parallelize(1 to 100, 3)  item.reduce((first, second) => first + second)  item.reduce(_ + _)
  • res1: Int = 5050
  • val item = sc.parallelize(List(1, 2, 3))  item.filter(_ > 4).reduce(_ + _)
  • java.lang.UnsupportedOperationException: empty collection

2.5.4 Action-reduce
reduce

  • def reduce(f: (T, T) => T): T
  • 對RDD中的元素進行聚合操作
  • 注意:reduceByKey是Transformation
  • 如果集合為空則會丟擲Exception

Example

  • val item = sc.parallelize(1 to 100, 3)  item.reduce((first, second) => first + second)  item.reduce(_ + _)
  • res1: Int = 5050
  • val item = sc.parallelize(List(1, 2, 3))  item.filter(_ > 4).reduce(_ + _)
  • java.lang.UnsupportedOperationException: empty collection

2.5.5 Action-fold
fold

  • def fold(zeroValue: T)(op: (T, T) ⇒ T): T
  • 類似於reduce,對RDD進行聚合操作
  • 首先每個分割槽分別進行聚合,初始值為傳入的zeroValue,然後對所有的分割槽進行聚合

Example

  • val item = sc.parallelize(1 to 100, 3)  item.fold(0)(_ + _)
  • res1: Int = 5050
  • val item = sc.parallelize(List(1, 2, 3))  item.filter(_ > 4).fold(0)(_ + _)
  • res198: Int = 0

2.5.6 Action-aggregateByKey
aggregate

  • def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U,  combOp: (U, U) => U): U
  • 允許傳入兩個function:seqOp和combOp
  • seqOp負責對每個分割槽進行聚合,初始值為zeroValue
  • combOp負責對每個分割槽聚合的結果進行合併

Example

  • val rdd = sc.parallelize(Seq(  ("A",110),("A",130),("A",120),("B",200),("B",206),("B",206),("C",150),("C",160),("C",170)))
  • val agg_rdd = rdd.aggregateByKey((0,0))((acc, value) => (acc._1 + value, acc._2 +  1),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
  • val avg = agg_rdd.mapValues(x => (x._1/x._2))
  • avg.collect
  • (B,204) (A,120) (C,160)

3,spark記憶體模型

Spark記憶體優化

 Executor最大任務並行度

  • TP = N/C
  • 其中N=spark.executor.cores, C=spark.task.cpus
  • 任務以Thread方式執行
  • 活躍執行緒可使用記憶體範圍(1/2n, 1/n) why?

 出現Executor OOM錯誤(錯誤程式碼137,143等)
原因:Executor Memory達到上限,解決辦法:

  • 增加每個Task記憶體使用量
  • 增大最大Heap值
  • 降低spark.executor.cores數量
  • 或者降低單個Task記憶體消耗量
  • 每個partition對應一個任務
  • 非SQL類應用 spark.default.parallism
  • SQL類應用 spark.sql.shuffle.partition

資料傾斜問題:可以採用對key值加鹽的方式解決。