RDD與共享變數
RDD和共享變數是Spark中的兩個重要抽象。
RDD
彈性分散式資料集, 是分散式記憶體的一個抽象概念,RDD提供了一種高度受限的共享記憶體模型,即RDD是隻讀的記錄分割槽的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而建立,然而這些限制使得實現容錯的開銷很低。
RDD的建立
- 從檔案系統中載入資料建立
- 通過並行集合(資料)建立
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)
//也可從列表中建立
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)
RDD轉換和Action操作
對於RDD而言,每一次轉換操作都會產生不同的RDD,供給下一個“轉換”使用。轉換得到的RDD是惰性求值的,也就是說,整個轉換過程只是記錄的轉換的軌跡,並不會發生真正的計算,只有遇到行動操作時,才會發生真正的計算,從血緣關係源頭開始,進行物理的轉換操作。
- filter()
- map()
- flapMap()
- groupByKey() 應用於(K,V)鍵值對的資料集時,返回一個新的(K, Iterable)形式的資料集。
- reduceByKey(lambda) 應用於(K,V)鍵值對的資料集時,返回一個新的(K,V)形式的資料集,其中的每個值是將每個key傳遞到函式lambda中進行聚合。
終止操作是真正觸發計算的地方。Spark程式執行到終止操作時,才會執行真正的計算,從檔案中載入資料,完成一次又一次轉換操作,最終,完成行動操作得到結果。
常見的Action操作有:
- count() 返回資料集中的元素個數
- collect() 以陣列的形式返回資料集中的所有元素
- first() 返回資料集中的第一個元素
- take(n) 以陣列的形式返回資料集中的前n個元素
- reduce(lambda) 通過函式lambda聚合資料集中的元素
- foreach(lambda) 將資料集中的每一個元素執行lambda操作
RDD的持久化
在Spark中,RDD採用惰性求解的機制,每次遇到行動操作,都會從頭開始執行計算。每次呼叫行動操作,都會觸發一次從頭開始的計算。這對於迭代計算而言,代價是很大的,迭代計算經常需要多次重複使用一組資料。
可以通過持久化(快取)機制避免這種重複計算的開銷。使用persist()方法對一個RDD標記為持久化。之所以說“標記為持久化”,是因為出現persist()語句的地方,並不會馬上計算生成RDD並把它持久化,而是要等到遇到第一個行動操作觸發,才會把計算結果進行持久化。
持久化後的RDD將會保留在計算節點的記憶體中被後面的行動操作重複使用。
persist()持久化級別引數:
- persist(MEMORY_ONLY):表示將RDD作為反序列化的物件儲存於JVM中,如果記憶體不足,就要按照LRU原則替換快取中的內容。
- persist(MEMORY_AND_DISK)表示將RDD作為反序列化的物件儲存在JVM中,如果記憶體不足,超出的分割槽將會被存放在硬碟上。
- 一般而言,使用cache()方法時,會呼叫persist(MEMORY_ONLY)
- 可以使用unpersist()方法手動地把持久化的RDD從快取中移除。
val list= List("Hadoop", "Spark", "Hive")
val rdd = sc.parallelize(list)
rdd.cache() //回撥用persist(MEMORY_ONLY),但是,語句執行到這裡,並不會快取rdd,這是rdd還沒有被計算生成
println(rdd.count) //第一次行動操作,觸發一次真正從頭到尾的計算,這時才會執行上面的rdd.cache()
print(rdd.collection.mkString(",")) //第二次行動操作,不需要從頭到尾的計算,只需要重複使用上面快取中的rdd
RDD分割槽
RDD是彈性分散式資料集,通常RDD很大,會被分成很多個分割槽,分別儲存在不同的節點上。為什麼要分割槽?
- 增加並行讀
- 減少通訊開銷
在分散式程式中,通訊大代價是很大的,因此控制資料分佈獲得最少的網路傳輸可以極大地提升整體效能。所以對RDD進行分割槽的目的就是減少網路傳輸的代價以提高系統的效能
只有當資料集多次在諸如連線這種基於鍵的操作中使用時,分割槽才會有幫助。若RDD只需要掃描一次,就沒有必要進行分割槽處理。
能從Spark分割槽或獲取的操作有:cogroup()、groupWith()、join()/leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey()已經lookup()
RDD分割槽的一個原則是使得分割槽的個數儘量等於叢集中的CPU核心(core)數目。
對於不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通過設定spark.default.parallelism這個引數的值,來配置預設的分割槽數目,一般而言:
- 本地模式:預設為本地機器的CPU數目,若設定了local[N],則預設為N
- Apache Mesos:預設的分割槽數為8
- Standalone或YARN:在“叢集中所有CPU核心數目總和”和“2”二者中取較大值作為預設值
如何設定分割槽:
- 建立RDD時:在呼叫textFile和parallelize方法時手動指定分割槽個數即可,語法格式:sc.textFile(path, partitionNum)
- 通過轉換操作得到新RDD時:直接呼叫repartition方法即可。
var rdd = data.repartition(4)
rdd2.partitions.size # 4
val array = Array(1,2,3,4,5)
//設定兩個分割槽
val rdd2 = sc,parallelize(array,2)
- 對於parallelize而言,如果沒有在方法中指定分割槽數,則預設為spark.defalut.parallelism
- 對於textFile而言,如果沒有在防止指定分割槽數,則預設為min(defaultParallelism, 2),其中,defaultParallelism對應的就是spark.default.parallelism
- 如果是從HDFS中讀取檔案,則分割槽數為檔案分片數(比如,128MB/片)
共享變數
當Spark在叢集的多個不同節點的多個任務上並行執行一個函式時,它會把函式中涉及到的每個變數,在每個任務上都生成一個副本。但是,有時候需要在多個任務之間共享變數,或者在任務(Task)和任務控制節點(Driver Program)之間共享變數。為了滿足這種需求,Spark提供了兩種型別的變數:廣播變數(broadcast variables)和累加器(accumulators)。廣播變數用來把變數在所有節點的記憶體之間進行共享。累加器則支援在所有不同節點之間進行累計計算(比如計數或者求和)。
廣播變數
廣播變數執行程式開發人員在每個機器上快取一個只讀的變數,而不是為機器上的每個任務都生產一個副本。
Spark的“行動”操作會跨越多個階段(stage),對於每個階段內的所有任務所需要的公共資料,Spark都會自動進行廣播。
可以通過呼叫SparkContext.broadcast(v)來從一個普通變數v中建立一個廣播變數。這個廣播變數就是對普通變數v的一個包裝器,通過呼叫value方法就可以獲得這個廣播變數的值,具體程式碼如下:
val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value
- 廣播變數被建立以後,那麼在叢集中的任何函式中,都應該使用廣播變數broadcastVar的值,而不是使用v的值,這樣就不會把v重複分發到這個節點上。
- 一旦廣播變數建立後,普通變數v的值就不能再發生修改,從而確保所有節點都獲得這個廣播變數的相同的值。
import org.apache.spark.SparkConf
import org.apache.spakr.SparkContext
object BroadCastValue {
val conf = new SparkConf().setAppName("BroadCastValue1").setMaster("local[1]")
//獲取SparkContext
val sc = new SparkContext(conf)
//建立廣播變數
val broads = sc.broadcast(3) //變數可以是任意型別
//建立一個測試的List
val lists = List(1,2,3,4,5)
//轉換為rdd(並行化)
val listRDD = sc.parallelize(lists)
//map操作資料
val results = listRDD.map(x=>x*broads.value)
//遍歷結果
results.foreach(x=>println("Thre result is:"+x))
sc.stop
}
累加器
累加器是僅僅被相關操作累加的變數,通常可以被用來實現計數器(counter)和求和(sum)。Spark原生的支援數值型(numeric)的累加器,程式開發人員可以編寫對新型別的支援。
一個數組型的累加器,可以通過呼叫SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()來建立。執行在叢集中的任務,就可以使用add方法來把數值累加到累加器上。但是,這些任務只能做累加操作,不能讀取累加器的值,只有任務控制節點(Driver Program)可以使用value方法來讀取累加器的值。
下面是累加器對一個數組元素進行求和的示例:
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))
accum.value