Spark-core知識體系總結
什麼是RDD
RDD(Resilient Distributed Dataset)叫做彈性分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。RDD具有資料流模型的特點:自動容錯、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。
RDD包含5個特徵:
1、一個分割槽的列表
2、一個計算函式compute,對每個分割槽進行計算
3、對其他RDDs的依賴(寬依賴、窄依賴)列表
4、對key-value RDDs來說,存在一個分割槽器(Partitioner)【可選的】
5、對每個分割槽有一個優先位置的列表【可選的】
- 一組分片(Partition),即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。
- 一個計算每個分片的函式。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。
- RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。
- 一個Partitioner,即RDD的分片函式。當前Spark中實現了兩種型別的分片函式,一個是基於雜湊的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
- 一個列表,儲存存取每個Partition的優先位置(preferred location)。對於一個HDFS檔案來說,這個列表儲存的就是每個Partition所在的塊的位置。按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置。
建立RDD
(1)在你的 driver program(驅動程式)中 parallelizing 一個已經存在的Scala集合。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
檢視該rdd的分割槽數量,預設是程式所分配的cpu core的數量,也可以在建立的時候指定
rdd1.partitions.length
建立的時候指定分割槽數量:
val rdd1 = sc.parallelize(Array(1,2,3.4),3)
(2)由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile(“hdfs://hadoop141:8020/words.txt”)
RDD總結
RDD是Spark的最基本抽象,是對分散式記憶體的抽象使用,實現了以操作本地集合的方式來操作分散式資料集的抽象實現。RDD是Spark最核心的東西,它表示已被分割槽,不可變的並能夠被並行操作的資料集合,不同的資料集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache到記憶體中,每次對RDD資料集的操作之後的結果,都可以存放到記憶體中,下一個操作可以直接從記憶體中輸入,省去了MapReduce大量的磁碟IO操作。這對於迭代運算比較常見的機器學習演算法, 互動式資料探勘來說,效率提升非常大。
RDD是Spark中的抽象資料結構型別,任何資料在Spark中都被表示為RDD。從程式設計的角度來看,RDD可以簡單看成是一個數組。和普通陣列的區別是,RDD中的資料是分割槽儲存的,這樣不同分割槽的資料就可以分佈在不同的機器上,同時可以被並行處理。因此,Spark應用程式所做的無非是把需要處理的資料轉換為RDD,然後對RDD進行一系列的變換和操作從而得到結果。
tips:spark-shell --total-executor-cores 1 只在standalone 和 mesos模式下有用
rdd1.mapPartitionsWithIndex((idx, iter)=>{
Iterator(s"[$idx;${iter.toArray.mkString(",")}]\n")
}).collect
spark原始碼之型別引數
- Scala的類和方法、函式都可以是泛型,在Spark原始碼中可以到處看到類和方法的泛型,在實際例項化的時候指定具體的型別,例如Spark最核心、最基礎、最重要的抽象資料結構RDD裡面關於RDD的類的定義是泛型的,RDD的幾乎所有方法的定義也都是泛型的,之所以這麼做,是因為RDD會派生很多子類,通過子類適配了各種不同的資料來源以及業務邏輯操作;
- 關於對型別邊界的限定,分為上邊界和下邊界:
上邊界:表達了泛型的型別必須是某種型別或者某種類的子類,語法為<: ,這裡的一個新的現象是對型別進行限定;
下邊界:表達了泛型的型別必須是某種型別或者某種類的父類,語法為>: ; - View Bounds,可以進行某種神祕的轉換,把你的型別可以在沒有知覺的情況下轉換成為目標型別,其實你可以認為View Bounds是上邊界和下邊界的加強補充版本,例如在SparkContext這個Spark的核心類中有T <% Writable方式的程式碼,這個程式碼所表達的是T必須是Writable型別的,但是T有沒有直接繼承自Writable介面,此時就需要通過“implicit”的方式來實現這個功能;
- T:ClassTag,例如Spark原始碼中的RDD class RDD[T:ClassTag] 這個其實也是一種型別轉換系統,只是在編譯的時候型別資訊不夠,需要藉助於JVM的runtime來通過執行時資訊獲得完整的型別資訊,這在Spark中是非常重要的,因為Spark的程式的程式設計和執行是區分了Driver和Executor的,只有在執行的時候才知道完整的型別資訊。
RDD的特點
分割槽:RDD邏輯上是分割槽的,每個分割槽的資料是抽象存在的,計算的時候會通過一個compute函式得到每個分割槽的資料。如果RDD是通過已有的檔案系統構建,則compute函式是讀取指定檔案系統中的資料,如果RDD是通過其他RDD轉換而來,則compute函式是執行轉換邏輯將其他RDD的資料進行轉換。
只讀:RDD是隻讀的,要想改變RDD中的資料,只能在現有的RDD基礎上建立新的RDD;
由一個RDD轉換到另一個RDD,可以通過豐富的操作運算元(map、filter、union、join、reduceByKey… …)實現,不再像MR那樣只能寫map和reduce了。
依賴:RDDs通過操作運算元進行轉換,轉換得到的新RDD包含了從其他RDDs衍生所必需的資訊,RDDs之間維護著這種血緣關係(lineage),也稱之為依賴。依賴包括兩種,一種是窄依賴,RDDs之間分割槽是一一對應的;另一種是寬依賴,下游RDD的每個分割槽與上游RDD(也稱之為父RDD)的每個分割槽都有關,是多對多的關係。
持久化(快取):可以控制儲存級別(記憶體、磁碟等)來進行持久化。如果在應用程式中多次使用同一個RDD,可以將該RDD快取起來,該RDD只有在第一次計算的時候會根據血緣關係得到分割槽的資料,在後續其他地方用到該RDD的時候,會直接從快取處取而不用再根據血緣關係計算,這樣就加速後期的重用。
Spark程式設計模型
RDD被表示為物件;
通過物件上的方法呼叫來對RDD進行轉換;
最後輸出結果 或是 向儲存系統儲存資料;
RDD轉換運算元被稱為Transformation;
只有遇到Action運算元,才會執行RDD的計算(懶執行)
在Executor中完成資料的處理,資料有以下幾種:
1、Scala集合資料(測試)
2、檔案系統、DB(SQL、NOSQL)的資料
3、RDD
4、網路
Driver 主要是對SparkContext進行配置、初始化以及關閉。初始化SparkContext是為了構建Spark應用程式的執行環境,在初始化SparkContext,要先匯入一些Spark的類和隱式轉換;在Executor部分執行完畢後,需要將SparkContext關閉。
SparkContext是編寫Spark程式用到的第一個類,是Spark的主要入口點,它負責和整個叢集的互動;
如把Spark叢集當作服務端,那麼Driver就是客戶端,SparkContext則是客戶端的核心; SparkContext是Spark的對外介面,負責向呼叫者提供Spark的各種功能。
SparkContext用於連線Spark叢集、建立RDD、累加器、廣播變數;
1、SparkConf。SparkConf為Spark配置類,配置已鍵值對形式儲存;配置項包括:master、appName、Jars、ExecutorEnv等等;
2、SparkEnv。SparkEnv可以說是Context中非常重要的類,它維護著Spark的執行環境,包含有:serializer、RpcEnv、Block Manager、記憶體管理等;
3、DAGScheduler。高層排程器,將Job按照RDD的依賴關係劃分成若干個TaskSet,也稱為Stage;之後結合當前快取情況及資料就近的原則,將Stage提交給TaskScheduler;
4、TaskScheduler。負責任務排程資源的分配
5、SchedulerBackend。負責叢集資源的獲取和排程。
// SparkConf
sc.getConf.toDebugString
sc.getConf.getOption(“spark.app.name”)
sc.getConf.getOption(“spark.master”)
sc.getConf.getAll
// SparkEnv
import org.apache.spark._
SparkEnv.get
SparkEnv.get.memoryManager
SparkEnv.get.shuffleManager
RDD程式設計API—包含兩種運算元
參考文件:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
轉換得到的RDD是惰性求值的。也就是說,整個轉換過程只是記錄了轉換的軌跡,並不會發生真正的計算,只有遇到行動操作時,才會發生真正的計算,開始從血緣關係(lineage)源頭開始,進行物理的轉換操作;
常見Transformation運算元
map(func):對呼叫map的RDD資料集中的每個element都使用func,然後返回一個新的RDD,這個返回的資料集是分散式的資料集
filter(func):對呼叫filter的RDD資料集中的每個元素都使用func,然後返回一個包含使func為true的元素構成的RDD
flatMap(func):和map差不多,但是flatMap生成的是多個結果
mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition
mapPartitionsIndex(func):逐個處理每一個partition,使用迭代器it訪問每個partition的行,index儲存partition的索引
sample(withReplacement, fraction, seed):抽樣
union(otherDataset):返回一個新的dataset,包含源dataset和給定dataset的元素的集合
distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element
groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函式接受的key-valuelist
reduceByKey(func,[numTasks]):就是用一個給定的reducefunc再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數
sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean型別
join(otherDataset,[numTasks]):當有兩個KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks為併發的任務數
cogroup(otherDataset,[numTasks]):當有兩個KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks為併發的任務數
cartesian(otherDataset):笛卡爾積就是m*n
行動(Action)操作及常見運算元
Action觸發了Job的執行,application中如果有多個Action,那麼對應多個job。(看原始碼)
reduce(func):傳入的函式是兩個引數輸入返回一個值,傳入函式必須滿足交換律和結合律
collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組
count():返回的是dataset中的element的個數
first():返回的是dataset中的第一個元素
take(n):返回前n個elements
takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素
備註:與sample類似,但第二個引數不是百分比
saveAsTextFile(path):把dataset寫到一個textfile中,或者hdfs,或者hdfs支援的檔案系統中,spark把每條記錄都轉換為一行記錄,然後寫到file中
countByKey():返回的是key對應的個數的一個map,作用於一個RDD
foreach(func):對dataset中的每個元素都使用func
注意:reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作;
當採用groupByKey時,Spark將所有的鍵值對(key-value pair)都移動,叢集節點之間的開銷很大
reduceByKey的效率高,在能使用reduceByKey的地方就不要使用groupByKey。
常見的Pair RDD轉換操作
keys
把Pair RDD中的key返回形成一個新的RDD
values
把Pair RDD中的value返回形成一個新的RDD
sortByKey
返回一個根據鍵排序的RDD
mapValues(func)
對鍵值對RDD中的每個value都應用一個函式,但是,key不會發生變化
join
join表示內連線。對於給定的兩個輸入資料集(K,V1)和(K,V2),在兩個資料集中都存在的key會被輸出,最終得到一個(K,(V1,V2))型別的資料集
RDD控制運算元
Spark中控制運算元也是懶執行的,需要Action運算元觸發才能執行,主要是為了對資料進行快取。
控制運算元有三種,cache,persist,checkpoint,以上運算元都可以將RDD持久化,持久化的單位是partition。RDD控制運算元都是懶執行的。必須有一個action類運算元觸發執行。checkpoint運算元不僅能將RDD持久化到磁碟,還能切斷RDD之間的依賴關係。
RDD的快取(持久化)
快取是將計算結果寫入不同的介質,使用者定義可定義儲存級別(儲存級別定義了快取儲存的介質,目前支援記憶體、堆外記憶體、磁碟);
通過快取,Spark避免了RDD上的重複計算,能夠極大地提升計算速度;
Spark速度非常快的原因之一,就是在記憶體中持久化(或快取)一個數據集。當持久化一個RDD後,每一個節點都將把計算的分片結果儲存在記憶體中,並在對此資料集(或者衍生出的資料集)進行的其他動作(action)中重用。這使得後續的動作變得更加迅速;
RDD相關的持久化或快取,是Spark最重要的特徵之一。可以說,快取是Spark構建迭代式演算法和快速互動式查詢的關鍵因素;
使用persist()方法對一個RDD標記為持久化。之所以說“標記為持久化”,是因為出現persist()語句的地方,並不會馬上計算生成RDD並把它持久化,而是要等到遇到第一個行動操作觸發真正計算以後,才會把計算結果進行持久化;
通過persist()或cache()方法可以標記一個要被持久化的RDD,一旦首次被觸發,該RDD將會被保留在計算節點的記憶體中並重用;
什麼時候該快取資料,需要對空間和速度進行權衡,垃圾回收開銷的問題讓情況變的更復雜。一般情況下,如果多個動作需要用到某個 RDD,而它的計算代價又很高,那麼就應該把這個 RDD 快取起來;
快取有可能丟失,或者儲存於記憶體的資料由於記憶體不足而被刪除。RDD的快取的容錯機制保證了即使快取丟失也能保證計算的正確執行。通過基於RDD的一系列的轉換,丟失的資料會被重算。RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。
persist()的引數可以指定持久化級別引數;
persist(MEMORY_ONLY):表示將RDD作為反序列化的物件儲存於JVM中,如果記憶體不足,就要按照LRU(Least recently used,最近最少使用)原則替換快取中的內容;
persist(MEMORY_AND_DISK)表示將RDD作為反序列化的物件儲存在JVM中,如果記憶體不足,超出的分割槽將會被存放在硬碟上【備註:並不是在記憶體和磁碟上都放,而是優先使用記憶體,如果記憶體不夠才會使用磁碟】;
使用cache()方法時,會呼叫persist(MEMORY_ONLY),即:
cache() == persist(MEMORY_ONLY)
可使用unpersist()方法手動地把持久化的RDD從快取中移除;
Storage的級別
如何選擇快取級別
應該如何選取持久化的儲存級別,實際上儲存級別的選取就是Memory與CPU之間的雙重權衡,可參考以下內容:
(1)如果RDD的資料可以很好的相容預設儲存級別(MEMORY_ONLY),那麼優先使用它,這是CPU工作最為高效的一種方式,可以很好地提高執行速度;
(2)如果(1)不能滿足,則嘗試使用MEMORY_ONLY_SER,且選擇一種快速的序列化工具,也可以達到一種不錯的效果;
(3)一般情況下不要把資料持久化到磁碟,除非計算是非常“昂貴”的或者計算過程會過濾掉大量資料,因為重新計算一個分割槽資料的速度可能要高於從磁碟讀取一個分割槽資料的速度;
(4)如果需要快速的失敗恢復機制,則使用備份的儲存級別,如MEMORY_ONLY_2、MEMORY_AND_DISK_2;雖然所有的儲存級別都可以通過重新計算丟失的資料實現容錯,但是快取機制使得大部分情況下應用無需中斷,即資料丟失情況下,直接使用快取資料,而不需要重新計算資料的過程;
(5)如果處於大記憶體或多應用的場景下,OFF_HEAP可以帶來以下的好處:
它允許Spark Executors可以共享Tachyon的記憶體資料;
它很大程式上減少JVM垃圾回收帶來的效能開銷;
Spark Executors故障不會導致資料丟失。
最後,Spark可以自己監測“快取”空間的使用,並使用LRU演算法移除舊的分割槽資料。也可以通過顯式呼叫RDD unpersist()手動移除資料。
RDD分割槽
分割槽的目的:設定合理的並行度,提高資料處理的效能。
在分散式程式中,通訊的代價是很大的,因此控制資料分佈以獲得最少的網路傳輸可以提升整體效能。對RDD進行合理的分割槽,可以減少網路傳輸的代價,進而提高系統性能;
RDD分割槽的一個分割槽原則是:
儘可能使得分割槽的個數,等於叢集核心數目;
儘可能使同一 RDD 不同分割槽內的記錄的數量一致;
建立操作中,開發者可以手動指定分割槽的個數,例如:
sc.parallelize(arr, 2) 表示建立得到的 RDD 分割槽個數為 2,在沒有指定分割槽個數的情況下,Spark 會根據叢集部署模式,來確定一個分割槽個數預設值。
rdd1.getNumPartitions
rdd1.partitions.size
對於 parallelize(makeRDD) 方法,預設情況下,分割槽的個數會受 Spark 配置引數 spark.default.parallelism 的影響,該引數也用於控制 Shuffle 過程中預設使用的任務數量。
無論是local模式、Standalone 模式、Yarn 模式或者是 Mesos 模式來執行 Spark,分割槽的默認個數等於 spark.default.parallelism 的指定值,若該值未設定,Spark 根據不同叢集模式,來確定這個值。
- local模式,預設分割槽個數等於本地機器的 CPU 核心總數(或者是使用者通過 local[N] 引數指定分配給 Apache Spark 的核心數目)。這樣把每個分割槽的計算任務交付給單個核心執行,能夠保證最大的計算效率;
- Standalone 或者 Yarn,預設分割槽個數等於叢集中所有核心數目的總和,或者 2,取兩者中的較大值;
- 若使用 Apache Mesos 作為叢集的資源管理系統,預設分割槽個數等於 8;
對於 textFile 方法,預設情況下:
每個HDFS的分割槽檔案(預設塊大小128M),每個都會建立一個RDD分割槽;
對於本地檔案,預設分割槽個數等於 min(defaultParallelism, 2);
可以使用下列方式對RDD的分割槽數進行修改:
rdd.textFile("", n)
rdd.parallelize(arr, n)
還可以使用 repartition(有shuffle)、coalesce 對RDD進行重分割槽
備註:呼叫data.repartition後,data的分割槽數並不會改變,而是返回一個新的RDD,其分割槽數等於repartition後的分割槽數。
寬依賴、窄依賴
RDD的依賴分為兩種:窄依賴(Narrow Dependencies)與寬依賴(Wide Dependencies,原始碼中稱為Shuffle Dependencies)
依賴有2個作用,其一用來解決資料容錯;其二用來劃分stage。
窄依賴:每個父RDD的一個Partition最多被子RDD的一個Partition所使用(1:1 或 n:1)。例如map、filter、union等操作會產生窄依賴;
寬依賴:一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作會產生寬依賴;
相比於寬依賴,窄依賴對優化很有利,主要基於以下兩點:
1、寬依賴對應著shuffle操作,需要在執行過程中將同一個父RDD的分割槽傳入到不同的子RDD分割槽中,中間可能涉及多個節點之間的資料傳輸;而窄依賴的每個父RDD的分割槽只會傳入到一個子RDD分割槽中,通常可以在一個節點內完成轉換。
2、當RDD分割槽丟失時(某個節點故障),spark會對資料進行重算。對於窄依賴,由於父RDD的一個分割槽只對應一個子RDD分割槽,這樣只需要重算和子RDD分割槽對應的父RDD分割槽即可,所以這個重算對資料的利用率是100%;
對於寬依賴,重算的父RDD分割槽對應多個子RDD分割槽,這樣實際上父RDD 中只有一部分的資料是被用於恢復這個丟失的子RDD分割槽的,另一部分對應子RDD的其它未丟失分割槽,這就造成了多餘的計算;更一般的,寬依賴中子RDD分割槽通常來自多個父RDD分割槽,極端情況下,所有的父RDD分割槽都要進行重新計算。
重算的效用不僅在於算的多少,還在於有多少是冗餘的計算
窄依賴允許在一個叢集節點上以流水線的方式(pipeline)計算所有父分割槽。例如,逐個元素地執行map、然後filter操作;而寬依賴則需要首先計算好所有父分割槽資料,然後在節點之間進行Shuffle。
窄依賴能夠更有效地進行失效節點的恢復,即只需重新計算丟失RDD分割槽的父分割槽,而且不同節點之間可以平行計算;而對於一個寬依賴關係的Lineage圖,單個節點失效可能導致這個RDD的所有祖先丟失部分分割槽,因而可能導致整體重新計算。
RDD容錯
為什麼做checkpoint
分散式計算中難免因為網路,儲存等原因出現計算失敗的情況,RDD中的lineage資訊常用來在task失敗後重計算使用,為了防止計算失敗後從頭開始計算造成的大量開銷,RDD會checkpoint計算過程的資訊,這樣作業失敗後從checkpoint點重新計算即可,提高效率。
Checkpoint是針對整個RDD計算鏈條中特別需要資料持久化的環節(後面會反覆使用當前環節的RDD)開始基於HDFS等的資料持久化複用策略,通過對RDD啟動Checkpoint機制來實現容錯和高可用;
什麼時候寫checkpoint資料
當RDD的action運算元觸發計算結束後會執行checkpoint。
只有在Action觸發Job的時候才會進行checkpoint。Spark在執行完Job之後會判斷是否需要checkpoint。
什麼時候讀checkpoint資料
task計算失敗的時候會從checkpoint讀取資料
會被重複使用的(但是)不能太大的RDD需要persist或者cache 。
哪些 RDD 需要 checkpoint?運算時間很長或運算量太大才能得到的 RDD,computing chain 過長或依賴其他 RDD 很多的 RDD。
checkpoint與persist或者cache的區別在於,持久化只是將資料儲存在BlockManager中但是其lineage是不變的,但是checkpoint執行完後,rdd已經沒有依賴RDD,只有一個checkpointRDD,checkpoint之後,RDD的lineage就改變了(斬斷依賴)。而且,持久化的資料丟失的可能性較大,因為可能磁碟或記憶體被清理,但是checkpoint的資料通常儲存到hdfs上,放在了高容錯檔案系統。
rdd.persist(StorageLevel.DISK_ONLY) 與 checkpoint 也有區別。前者雖然可以將 RDD 的 partition 持久化到磁碟,但該 partition 由 blockManager 管理。一旦 driver program 執行結束,也就是 executor 所在程序 CoarseGrainedExecutorBackend stop,blockManager 也會 stop,被 cache 到磁碟上的 RDD 也會被清空(整個 blockManager 使用的 local 資料夾被刪除)。
而 checkpoint 將 RDD 持久化到 HDFS 或本地資料夾,如果不被手動 remove 掉,是一直存在的。
分割槽器(partitioner)
在Spark中分割槽器直接決定了:
RDD中分割槽的個數;
RDD中每條資料經過Shuffle過程屬於哪個分割槽;
reduce的個數;
只有Key-Value型別的RDD才有分割槽器,非Key-Value型別的RDD分割槽器的值是None。
分割槽器的作用及分類:
在PairRDD(key,value)中,很多操作都是基於key的,系統會按照key對資料進行重組,如groupbykey;
資料重組需要規則,最常見的就是基於Hash分割槽,Spark還提供了一種複雜的基於抽樣的Range分割槽方法;
HashPartitioner:是最簡單也是預設提供的分割槽器。對於給定的key,計算其hashCode,併除以分割槽的個數取餘,如果餘數小於0,則用餘數+分割槽的個數,最後返回的值就是這個key所屬的分割槽ID。該分割槽方法可以保證key相同的資料出現在同一個分割槽中。
使用者可通過partitionBy主動使用分割槽器,通過partitions引數指定想要分割槽的數量。
RangePartitioner:簡單的說就是將一定範圍內的數對映到某一個分割槽內。演算法比較複雜。sortByKey會使用RangePartitioner。
自定義分割槽器:Spark允許使用者通過自定義的Partitioner物件,靈活的來控制RDD的分割槽方式。
共享變數
當Spark在叢集的多個不同節點的多個任務上並行執行一個函式時,它會把函式中涉及到的每個變數,在每個任務上都生成一個副本;
有時候需要在多個任務之間共享變數,或者在任務(Task)和任務控制節點(Driver Program)之間共享變數;
為了滿足這種需求,Spark提供了兩種型別的變數:
- 廣播變數(broadcast variables)
- 累加器(accumulators)
廣播變數將變數在節點的Executor之間進行共享(driver廣播出去);
累加器則支援在所有不同節點之間進行累加計算(比如計數或者求和);
廣播變數
廣播變數(broadcast variables)允許在每個機器上快取一個只讀的變數,而不是為機器上的每個任務都生成一個副本;
Spark的Action操作會跨越多個階段(stage),對於每個階段內的所有任務所需要的公共資料,Spark都會自動進行廣播;
可以通過呼叫SparkContext.broadcast(v)來從一個普通變數v中建立一個廣播變數。這個廣播變數就是對普通變數v的一個包裝器,通過呼叫value方法就可以獲得這個廣播變數的值,具體程式碼如下:
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value
這個廣播變數被建立以後,在叢集中的任何函式中,都可以使用廣播變數中的值,這樣就不會把v重複分發到這些節點上
此外,一旦廣播變數建立後,普通變數v的值就不能再發生修改,從而確保所有節點都獲得這個廣播變數的相同的值
累加器
累加器是僅僅被相關操作累加的變數,通常可以被用來實現計數器(counter)和求和(sum)。Spark原生地支援數值型(numeric)的累加器,程式開發人員可以編寫對新型別的支援;
一個數值型的累加器,可以通過呼叫SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()來建立;
執行在叢集中的任務,就可以使用add方法來把數值累加到累加器上。但是,這些任務只能做累加操作,不能讀取累加器的值,只有任務控制節點(Driver Program)可以使用value方法來讀取累加器的值。
val accum = sc.longAccumulator(“My Accumulator”)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value
Spark Core原理
Shuffle
Shuffle的本意是洗牌,目的是為了把牌弄亂。
Spark、Hadoop中的shuffle可不是為了把資料弄亂,而是為了將隨機排列的資料轉換成具有一定規則的資料。
Shuffle是MapReduce計算框架中的一個特殊的階段,介於Map 和 Reduce 之間。當Map的輸出結果要被Reduce使用時,輸出結果需要按key排列,並且分發到Reducer上去,這個過程就是shuffle。
shuffle涉及到了本地磁碟(非hdfs)的讀寫和網路的傳輸,大多數Spark作業的效能主要就是消耗在了shuffle環節。因此shuffle效能的高低直接影響到了整個程式的執行效率