spark原理解析和spark core
spark原理解析
- 解析一:resilient distributed dataset(RDD)
resilient distributed dataset(RDD):彈性分散式資料集,有容錯機制可並行執行。
分散式即體現在每個rdd分多個partition,每個partition在執行時為一個task,會被master分配到某一個worker執行器(Executor)的某一個core中。
彈性體現在流水線思想(pipe),即rdd方法分為transformations和actions方法,兩者的官方解釋為:RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset。transformations類方法在執行過程中,只會記錄每個rdd的依賴,不會立即執行,在這個過程中,可以彈性的處理partition。當action類方法執行時,會按照依賴,每個rdd去父rdd中要資料。
- 解析二:窄依賴(完全依賴)和寬依賴(部分依賴)
transformations類方法的依賴分為窄依賴(完全依賴)和寬依賴(部分依賴),窄依賴可以理解為:每個父rdd的一個分割槽的資料只會給一個子rdd的一個分割槽(一個task的資料只會給流水線下游的一個task),子rdd的分割槽中資料來自一個多個父rdd的分割槽的資料;寬依賴肯定會有某些或全部父rdd的task資料給多個子rdd的task。
當寬依賴時,需要進行shuffle,此時,會按照shuffle切分成一個個stage。
整個job的過程是一個有向無環圖(DAG),如下圖,是rdd方法leftOuterJoin執行時的一個DAG,rdd leftOuterJoin是寬依賴,因此要劃分stage,並會發生shuffle;當觸發action類方法如collect時會按照依賴往dirver拉資料時,會從rdd leftOuterJoin的task中拿資料,從而自下而上,觸發整個流水線作業。
Dependency
NarrowDependency(窄依賴)
OneToOneDependency
RangeDependency
ShuffleDependency(寬依賴)
- 解析三:shuffle
ShuffleManager
HashShuffleManager
SortShuffleManager(預設)
目前預設的shuffle方式為:SortShuffleManager
由於一個worker上執行多個task,每個worker上生成的所有臨時檔案數是reduce的數量
具體reduceByKey的shuffle過程如下,在map端會進行shuffle寫,會先寫到快取,然後寫到磁碟;在reduce端會進行shuffle讀,讀取時會判斷取遠端讀還是在本機讀,讀取時也會先寫到快取。
shuffle時,等map端的父stage寫完後,reduce端才會去進行fetch,fetch的時候是邊fetch邊處理,不會等全部fetch完再處理。
另外一種方式,hashShuffle,每個worker上會生成map*reduce個磁碟檔案,會增大磁碟io以及記憶體的壓力。
shuffle涉及的設定如下:
1、shuffle方式(sort、hash)
spark.shuffle.manager
2、spark.shuffle.file.buffer.kb
shuffle寫入快取的大小(預設32kb)
3、spark.reducer.maxMbInFlight
shuffle讀(reduce端)快取大小(預設48m)
4、spark.shuffle.compress
shuffle寫入磁碟是否壓縮,預設true
5、spark.shuffle.io.maxRetries
shuffle讀通過netty fetches讀時,失敗的最大嘗試次數,預設3
6、spark.shuffle.io.retryWait
5中每次等待幾秒(預設5s)
7、spark.shuffle.service.index.cache.size
- 解析四:task數量
當transformation方法時,就確定了map和reduce的task數量。
一般一個worker啟動一個Executor,預設每個Executor使用的core數(同一時間一個core只能執行一個task)為機器的所有核心數(即每個CPU的核數相加)
使用rdd方法建立一個rdd時,如果執行在cluster模式下,partition預設的數量為所有Executor的總core數。
reduce的partition的數量。由於reduce可能來自多個rdd,如果沒有自己實現分割槽器(partition)時,使用的是預設的分割槽器,此時如果配置檔案沒有配置引數時,使用的是父rdd的最大分割槽數,原始碼如下:
*reduce分割槽的數量
* 先按rdd1、rdd2的分割槽數進行降序排列,此時按續遍歷,如果發現有rdd自己定時了partitioner,就返回自己定義的;如果沒有定義,去查詢spark.default.parallelism,如果沒有該配置,返回父rdd分割槽數最高的一個分割槽;上面rdd3的分割槽為3個,取最高的
* 預設是HashPartitioner 還有一個RangePartitioner
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}
解析五:打包釋出,使用資源管理yarn
程式如下:
object TestSparkCore {
def main(args: Array[String]): Unit = {
// 1. 建立spark context
val conf = new SparkConf()
conf.setAppName("first") // 設定應用程式的名字
// conf.setMaster("spark://focuson1:7077") // 等價於 --master 引數
conf.set("spark.shuffle.manager", "hash") // 修改shuffle的實現類
val sc = new SparkContext(conf)
test_reduceByKey(sc)
sc.stop()
}
打成jar包,傳到focuson1上,執行下面語句。
spark-submit --master yarn-cluster --class com.bd.spark.core.TestSparkCore my_first_app.jar
--master yarn-cluster 使用yarn或spark://ip:7077
--class 執行的類的,有包要寫上包
--conf 配置 如--conf spark.shuffle.manager=hash
--driver-class-path jar包路徑,不會發布到全部worker
--jars jar包路徑,會發布到全部worker
--application-arguments 傳遞給主方法的引數
*像conf、appname等程式的優先順序大於spark-submit
spark core
- core之rdd方法
- rdd action
- /*
- * actions方法
- //collect,從每個worker中拿資料,在driver中顯示。driver可能會oom
- //takeordered是升序從每個分割槽中(一個rdd有多個分割槽,每個分割槽是一個task)拿出i個數據,拿到driver進行比較,拿出i個數據
- //top是降序,類似takeordered
- */
- def test_reduce(sc: SparkContext) = {
- val rdd = sc.makeRDD(List("hello world", "hello count", "world spark"))
- rdd.reduce((x, y) => (x + y))
- }
- def test_countApprox(sc: SparkContext) = {
- //在資料量特別大,不需要精確結果時,求一個近似值
- val rdd3 = sc.parallelize(List(1, 2, 3, 4, 5, 6))
- rdd3.countApprox(1000l)
- }
- def test_saveAsTextFile(sc: SparkContext) = {
- val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
- rdd.saveAsTextFile("hdfs://focuson1:9000/spark")
- }
- rdd transformation
- def test_flatMap(sc: SparkContext) = {
- val rdd = sc.makeRDD(List("hello world", "hello count", "world spark"), 2)
- val rdd2 = rdd.flatMap { x => x.split(" ") }
- println(rdd2.collect())
- //res1: Array[String] = Array(hello, world, hello, count, world, spark)
- }
- def test_union(sc: SparkContext) = {
- //執行時,在一個stage內,分為三個rdd 求並積 分割槽是rdd1+rdd2
- val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
- val rdd2 = sc.parallelize(List(5, 6, 7, 8, 9, 10))
- val rdd3 = rdd1 ++ rdd2
- //res54: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
- }
- def test_cartesian(sc: SparkContext) {
- //分割槽數為rdd1*rdd2 笛卡爾積
- val rdd1 = sc.parallelize(List("dog", "cat", "tiger"))
- val rdd2 = sc.parallelize(List(1, 2))
- rdd1.cartesian(rdd2)
- //res5: Array[(String, Int)] = Array((dog,1), (dog,2), (cat,1), (cat,2), (tiger,1), (tiger,2))
- }
- //mapPartitionsWithIndex
- def test_mapPartitionsWithIndex(sc: SparkContext) = {
- val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
- var list = List[String]()
- // 分割槽0的所有資料+a , 分割槽1的資料 +b
- rdd.mapPartitionsWithIndex((i, iter) => {
- while (iter.hasNext) {
- if (i == 0)
- list = list :+ (iter.next + "a")
- else {
- list = list :+ (iter.next + "b")
- }
- }
- list.iterator
- })
- }
- def test_zip(sc: SparkContext) = {
- //兩個rdd元素必須相等
- val list = sc.makeRDD(List(43, 5, 2, 5, 6, 33))
- val list2 = sc.makeRDD(List("a", "b", "c", "d", "e", "f"))
- list.zip(list2).collect
- //res29: Array[(Int, String)] = Array((43,a), (5,b), (2,c), (5,d), (6,e), (33,f))
- }
- def test_reparition(sc: SparkContext) = {
- val rdd3 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
- rdd3.coalesce(4) //預設是false,即分割槽由多變少,此時由2變為4不能成功,還是兩個分割槽
- rdd3.coalesce(4, true) //此時會成功
- /**
- * def coalesce(numPartitions: Int, shuffle: Boolean = false)
- * 預設是false,即分割槽由多變少,有多變少不會進行shuffle;true時會進行分割槽,此時會進行shuffle
- */
- /*
- def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
- coalesce(numPartitions, shuffle = true)
- }
- */
- rdd3.repartition(4) //相當於 rdd3.coalesce(4, true)
- }
- def test_reduceByKey(sc: SparkContext) = {
- //reduceByKey就是把key值進行分組,然後每組內進行reduce
- val rdd = sc.makeRDD(List(("hello", 1), ("hello", 1), ("hello", 1), ("world", 1), ("world", 1)))
- val rdd2 = rdd.reduceByKey { (x, y) => x + y }
- //res2: Array[(String, Int)] = Array((hello,3), (world,2))
- }
- def test_intersection(sc: SparkContext) = {
- //取兩個rdd的交集
- val rdd1 = sc.parallelize(List("dog", "cat", "tiger"), 2)
- val rdd2 = sc.parallelize(List("dog", "wolf", "pig"), 3)
- val rdd3 = rdd1.intersection(rdd2)
- //res23: Array[String] = Array(dog)
- }
- def test_sortBy(sc: SparkContext) = {
- val list = sc.makeRDD(List(43, 5, 2, 5, 6, 33))
- list.sortBy(x => x) //升序
- list.sortBy(x => x, false) //降序
- }
- def test_aggregateByKey(sc: SparkContext) = {
- import scala.math._
- val rdd = sc.parallelize(List(("pig", 3), ("cat", 2), ("dog", 5), ("cat", 4), ("dog", 3), ("cat", 3), ("cat", 7), ("cat", 4)), 2)
- rdd.aggregateByKey(0)((x, y) => x + y, (x, y) => x * y)
- /* partition:[0]
- (pig,3)
- (cat,2)
- (dog,5)
- (cat,4)
- partition:[1]
- (dog,3)
- (cat,3)
- (cat,7)
- (cat,4)*/
- //同一個分割槽內根據key進行分組,然後每組的value值進行第一個表示式的reduce操作
- /* partition:[0]
- (pig,3)
- (cat,6)
- (dog,5)
- partition:[1]
- (dog,3)
- (cat,14)
- (cat,7)
- */
- //然後對各個分割槽的所有資料按key進行分割槽,然後按對value值進行reduce
- //res38: Array[(String, Int)] = Array((dog,15), (pig,3), (cat,84))
- //引數0(zeroValue)是指參與第一個表示式的運算,即每個分割槽內按分割槽之後每個組都有一個zeroValue值。如果rdd.aggregateByKey(100)((x,y)=>x+y, (x,y)=>x*y)
- rdd.aggregateByKey(100)((x, y) => x + y, (x, y) => x * y)
- /* partition:[0]
- (pig,3)
- (cat,2)
- (dog,5)
- (cat,4)
- partition:[1]
- (dog,3)
- (cat,3)
- (cat,7)
- (cat,4)*/
- //同一個分割槽內根據key進行分組,然後每組的value值進行第一個表示式的reduce操作
- /* partition:[0]
- (pig,103)
- (cat,106)
- (dog,105)
- partition:[1]
- (dog,103)
- (cat,114)
- (cat,107)
- */
- //然後對各個分割槽的所有資料按key進行分割槽,然後按對value值進行reduce
- //res40: Array[(String, Int)] = Array((dog,10815), (pig,103), (cat,12084))
- }
- def test_cogroup(sc: SparkContext) = {
- val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3)))
- val rdd2 = sc.parallelize(List(("cat", 2), ("dog", 2)))
- rdd1.cogroup(rdd2)
- //res49: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((dog,(CompactBuffer(1),CompactBuffer(2))), (cat,(CompactBuffer(1, 3),CompactBuffer(2))))
- }
- def test_combineByKey(sc: SparkContext) = {
- }
- def test_groupBykey(sc: SparkContext) = {
- val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3), ("dog", 2)))
- rdd1.groupByKey()
- //res46: Array[(String, Iterable[Int])] = Array((dog,CompactBuffer(1, 2)), (cat,CompactBuffer(1, 3)))
- //Groupbykey會將所有的資料發給reducer,reducer壓力會比較大,另外會比較佔用網路頻寬, 相比之下,reduceByKey, 會在mapper端首先進行運算,reducer的壓力小,另外也可以節省網路頻寬
- }
- def test_join(sc: SparkContext) = {
- val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3)))
- val rdd2 = sc.parallelize(List(("cat", 2), ("dog", 2), ("tiger", 2)))
- rdd1.join(rdd2)
- //Array((dog,(1,2)), (cat,(1,2)), (cat,(3,2))) 將兩個rdd集合中key相同的元素連線在一起 沒有tiger
- }
- def test_leftOuterJoin(sc: SparkContext) = {
- val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3), ("wolf", 1)))
- val rdd2 = sc.parallelize(List(("cat", 2), ("dog", 2), ("tiger", 2)))
- val array = rdd1.leftOuterJoin(rdd2) // 左外連線
- //Array((wolf,(1,None)), (dog,(1,Some(2))), (cat,(1,Some(2))), (cat,(3,Some(2))))
- for ((k, v) <- array) {
- println("key:" + k + " value:" + v._2.getOrElse(0))
- }
- }
- cache
def test_cache(sc: SparkContext) = {
val rdd = sc.parallelize(List()).cache() //快取 (不會壓縮)
/*
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
*/
val rdd2 = sc.parallelize(List()).persist() //該方法等於cache,預設是MEMORY_ONLY
/*
spark.storage.memoryFraction = 0.6//這個意思表示0.6的記憶體作為快取,其餘的作為計算記憶體
StorageLevel.DISK_ONLY 只存到磁碟
StorageLevel.DISK_ONLY_2 在其他worker也快取一份
StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK2
StorageLevel.MEMORY_AND_DISK_SER//SER表是序列化壓縮
StorageLevel.MEMORY_AND_DISK_SER2
StorageLevel.MEMORY_ONLY_SER
StorageLevel.MEMORY_ONLY_SER2
StorageLevel.NONE
StorageLevel.OFF_HEAP//Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.
*/
val rdd3 = sc.parallelize(List()).persist(StorageLevel.OFF_HEAP)
}