1. 程式人生 > >RDD與共享變數

RDD與共享變數

RDD和共享變數是Spark中的兩個重要抽象。

RDD

彈性分散式資料集, 是分散式記憶體的一個抽象概念,RDD提供了一種高度受限的共享記憶體模型,即RDD是隻讀的記錄分割槽的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而建立,然而這些限制使得實現容錯的開銷很低。

RDD的建立

  • 從檔案系統中載入資料建立
  • 通過並行集合(資料)建立
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)
//也可從列表中建立
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)

RDD轉換和Action操作

對於RDD而言,每一次轉換操作都會產生不同的RDD,供給下一個“轉換”使用。轉換得到的RDD是惰性求值的,也就是說,整個轉換過程只是記錄的轉換的軌跡,並不會發生真正的計算,只有遇到行動操作時,才會發生真正的計算,從血緣關係源頭開始,進行物理的轉換操作。

  • filter()
  • map()
  • flapMap()
  • groupByKey() 應用於(K,V)鍵值對的資料集時,返回一個新的(K, Iterable)形式的資料集。
  • reduceByKey(lambda) 應用於(K,V)鍵值對的資料集時,返回一個新的(K,V)形式的資料集,其中的每個值是將每個key傳遞到函式lambda中進行聚合。

終止操作是真正觸發計算的地方。Spark程式執行到終止操作時,才會執行真正的計算,從檔案中載入資料,完成一次又一次轉換操作,最終,完成行動操作得到結果。

常見的Action操作有:

  • count() 返回資料集中的元素個數
  • collect() 以陣列的形式返回資料集中的所有元素
  • first() 返回資料集中的第一個元素
  • take(n) 以陣列的形式返回資料集中的前n個元素
  • reduce(lambda) 通過函式lambda聚合資料集中的元素
  • foreach(lambda) 將資料集中的每一個元素執行lambda操作

RDD的持久化

在Spark中,RDD採用惰性求解的機制,每次遇到行動操作,都會從頭開始執行計算。每次呼叫行動操作,都會觸發一次從頭開始的計算。這對於迭代計算而言,代價是很大的,迭代計算經常需要多次重複使用一組資料。

可以通過持久化(快取)機制避免這種重複計算的開銷。使用persist()方法對一個RDD標記為持久化。之所以說“標記為持久化”,是因為出現persist()語句的地方,並不會馬上計算生成RDD並把它持久化,而是要等到遇到第一個行動操作觸發,才會把計算結果進行持久化。

持久化後的RDD將會保留在計算節點的記憶體中被後面的行動操作重複使用。

persist()持久化級別引數:

  • persist(MEMORY_ONLY):表示將RDD作為反序列化的物件儲存於JVM中,如果記憶體不足,就要按照LRU原則替換快取中的內容。
  • persist(MEMORY_AND_DISK)表示將RDD作為反序列化的物件儲存在JVM中,如果記憶體不足,超出的分割槽將會被存放在硬碟上。
  • 一般而言,使用cache()方法時,會呼叫persist(MEMORY_ONLY)
  • 可以使用unpersist()方法手動地把持久化的RDD從快取中移除。
val list= List("Hadoop", "Spark", "Hive")
val rdd = sc.parallelize(list)
rdd.cache() //回撥用persist(MEMORY_ONLY),但是,語句執行到這裡,並不會快取rdd,這是rdd還沒有被計算生成
println(rdd.count) //第一次行動操作,觸發一次真正從頭到尾的計算,這時才會執行上面的rdd.cache()
print(rdd.collection.mkString(",")) //第二次行動操作,不需要從頭到尾的計算,只需要重複使用上面快取中的rdd

RDD分割槽

RDD是彈性分散式資料集,通常RDD很大,會被分成很多個分割槽,分別儲存在不同的節點上。為什麼要分割槽?

  • 增加並行讀
  • 減少通訊開銷

在分散式程式中,通訊大代價是很大的,因此控制資料分佈獲得最少的網路傳輸可以極大地提升整體效能。所以對RDD進行分割槽的目的就是減少網路傳輸的代價以提高系統的效能

只有當資料集多次在諸如連線這種基於鍵的操作中使用時,分割槽才會有幫助。若RDD只需要掃描一次,就沒有必要進行分割槽處理。

能從Spark分割槽或獲取的操作有:cogroup()、groupWith()、join()/leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey()已經lookup()

RDD分割槽的一個原則是使得分割槽的個數儘量等於叢集中的CPU核心(core)數目。
對於不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通過設定spark.default.parallelism這個引數的值,來配置預設的分割槽數目,一般而言:

  • 本地模式:預設為本地機器的CPU數目,若設定了local[N],則預設為N
  • Apache Mesos:預設的分割槽數為8
  • Standalone或YARN:在“叢集中所有CPU核心數目總和”和“2”二者中取較大值作為預設值

如何設定分割槽:

  1. 建立RDD時:在呼叫textFile和parallelize方法時手動指定分割槽個數即可,語法格式:sc.textFile(path, partitionNum)
  2. 通過轉換操作得到新RDD時:直接呼叫repartition方法即可。
var rdd = data.repartition(4)
rdd2.partitions.size # 4

val array = Array(1,2,3,4,5)
//設定兩個分割槽
val rdd2 = sc,parallelize(array,2) 
  • 對於parallelize而言,如果沒有在方法中指定分割槽數,則預設為spark.defalut.parallelism
  • 對於textFile而言,如果沒有在防止指定分割槽數,則預設為min(defaultParallelism, 2),其中,defaultParallelism對應的就是spark.default.parallelism
  • 如果是從HDFS中讀取檔案,則分割槽數為檔案分片數(比如,128MB/片)

共享變數

當Spark在叢集的多個不同節點的多個任務上並行執行一個函式時,它會把函式中涉及到的每個變數,在每個任務上都生成一個副本。但是,有時候需要在多個任務之間共享變數,或者在任務(Task)和任務控制節點(Driver Program)之間共享變數。為了滿足這種需求,Spark提供了兩種型別的變數:廣播變數(broadcast variables)和累加器(accumulators)。廣播變數用來把變數在所有節點的記憶體之間進行共享。累加器則支援在所有不同節點之間進行累計計算(比如計數或者求和)。

廣播變數

廣播變數執行程式開發人員在每個機器上快取一個只讀的變數,而不是為機器上的每個任務都生產一個副本。

Spark的“行動”操作會跨越多個階段(stage),對於每個階段內的所有任務所需要的公共資料,Spark都會自動進行廣播。

可以通過呼叫SparkContext.broadcast(v)來從一個普通變數v中建立一個廣播變數。這個廣播變數就是對普通變數v的一個包裝器,通過呼叫value方法就可以獲得這個廣播變數的值,具體程式碼如下:

val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value
  • 廣播變數被建立以後,那麼在叢集中的任何函式中,都應該使用廣播變數broadcastVar的值,而不是使用v的值,這樣就不會把v重複分發到這個節點上。
  • 一旦廣播變數建立後,普通變數v的值就不能再發生修改,從而確保所有節點都獲得這個廣播變數的相同的值。
import org.apache.spark.SparkConf
import org.apache.spakr.SparkContext

object BroadCastValue {
    val conf = new SparkConf().setAppName("BroadCastValue1").setMaster("local[1]")
    //獲取SparkContext
    val sc = new SparkContext(conf)
    //建立廣播變數
    val broads = sc.broadcast(3) //變數可以是任意型別
    //建立一個測試的List
    val lists = List(1,2,3,4,5)
    //轉換為rdd(並行化)
    val listRDD = sc.parallelize(lists)
    //map操作資料
    val results = listRDD.map(x=>x*broads.value)
    //遍歷結果
    results.foreach(x=>println("Thre result is:"+x))
    sc.stop
}

累加器

累加器是僅僅被相關操作累加的變數,通常可以被用來實現計數器(counter)和求和(sum)。Spark原生的支援數值型(numeric)的累加器,程式開發人員可以編寫對新型別的支援。

一個數組型的累加器,可以通過呼叫SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()來建立。執行在叢集中的任務,就可以使用add方法來把數值累加到累加器上。但是,這些任務只能做累加操作,不能讀取累加器的值,只有任務控制節點(Driver Program)可以使用value方法來讀取累加器的值。

下面是累加器對一個數組元素進行求和的示例:

val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))
accum.value