1. 程式人生 > >RDD持久化原理與共享變數

RDD持久化原理與共享變數

RDD 持久化工作原理

  • Spark非常重要的一個功能特性就是可以將RDD 持久化在記憶體中,當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到記憶體中,並且在之後對該RDD的反覆使用中,直接使用記憶體快取的partition,這樣的話,對於針對一個RDD反覆執行多個操作的場景,就只要對RDD計算一次即可,後面直接使用該RDD ,而不需要計算多次該RDD
  • 巧妙使用RDD持久化,甚至在某些場景下,可以將spark應用程式的效能提升10倍。對於迭代式演算法和快速互動式應用來說,RDD持久化,是非常重要的。
  • 要持久化一個RDD,只要呼叫其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接快取在每個節點中。而且Spark的持久化機制還是自動容錯的,如果持久化的RDD的任何partition丟失了,那麼Spark會自動通過其源RDD,使用transformation操作重新計算該partition。
  • cache()和persist()的區別在於,cache()是persist()的一種簡化方式,cache()的底層就是呼叫的persist()的無參版本,同時就是呼叫persist(MEMORY_ONLY),將資料持久化到記憶體中。如果需要從記憶體中去除快取,那麼可以使用unpersist()方法。

RDD持久化使用場景

  • 1、第一次載入大量的資料到RDD中
  • 2、頻繁的動態更新RDD Cache資料,不適合使用Spark Cache、Spark lineage

RDD 持久化的策略

如何選擇一種最合適的持久化策略?

  • 預設情況下,效能最高的當然是MEMORY_ONLY,但前提是你的記憶體必須足夠足夠大,可以綽綽有餘地存放下整個RDD的所有資料。因為不進行序列化與反序列化操作,就避免了這部分的效能開銷;對這個RDD的後續運算元操作,都是基於純記憶體中的資料的操作,不需要從磁碟檔案中讀取資料,效能也很高;而且不需要複製一份資料副本,並遠端傳送到其他節點上。但是這裡必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中資料比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM記憶體溢位異常。
  • 如果使用MEMORY_ONLY級別時發生了記憶體溢位,那麼建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD資料序列化後再儲存在記憶體中,此時每個partition僅僅是一個位元組陣列而已,大大減少了物件數量,並降低了記憶體佔用。這種級別比MEMORY_ONLY多出來的效能開銷,主要就是序列化與反序列化的開銷。但是後續運算元可以基於純記憶體進行操作,因此效能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的資料量過多的話,還是可能會導致OOM記憶體溢位的異常。
  • 如果純記憶體的級別都無法使用,那麼建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的資料量很大,記憶體無法完全放下。序列化後的資料比較少,可以節省記憶體和磁碟的空間開銷。同時該策略會優先儘量嘗試將資料快取在記憶體中,記憶體快取不下才會寫入磁碟。
  • 通常不建議使用DISK_ONLY和字尾為_2的級別:因為完全基於磁碟檔案進行資料的讀寫,會導致效能急劇降低,有時還不如重新計算一次所有RDD。字尾為_2的級別,必須將所有資料都複製一份副本,併發送到其他節點上,資料複製以及網路傳輸會導致較大的效能開銷,除非是要求作業的高可用性,否則不建議使用。

RDD進行持久化和不進行持久化的區別

RDD持久化程式碼

public class PersistApp {
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName(PersistApp.class.getSimpleName()).setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> linesRDD = sc.textFile("E:\\test\\scala\\access_2016-05-30.log");
    linesRDD.cache();

    long start = System.currentTimeMillis();
    List<String> list = linesRDD.take(10);
    long end = System.currentTimeMillis();
    System.out.println("first times cost" + (end - start) + "ms");
    System.out.println("-----------------------------------");
    start = System.currentTimeMillis();
    long count = linesRDD.count();
    end = System.currentTimeMillis();
    System.out.println("second times cost" + (end - start) + "ms");
    sc.close();
 }
}

共享變數

  • 通常情況下,當向Spark操作(如map,reduce)傳遞一個函式時,它會在一個遠端叢集節點上執行,它會使用函式中所有變數的副本。這些變數被複制到所有的機器上,遠端機器上並沒有被更新的變數會向驅動程式回傳。在任務之間使用通用的,支援讀寫的共享變數是低效的。 儘管如此,Spark提供了兩種有限型別的共享變數,廣播變數和累加器。

廣播變數

  • Spark的另一種共享變數是廣播變數。通常情況下,當一個RDD的很多操作都需要使用driver中定義的變數時,每次操作,driver都要把變數傳送給worker節點一次,如果這個變數中的資料很大的話,會產生很高的傳輸負載,導致執行效率降低。使用廣播變數可以使程式高效地將一個很大的只讀資料傳送給多個worker節點,而且對每個worker節點只需要傳輸一次,每次操作時executor可以直接獲取本地儲存的資料副本,不需要多次傳輸。

object BroadCastApp {
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("BroadCastApp")
    val sc = new SparkContext(conf)
    val list = List(1, 2, 4, 6, 0, 9)
    val set = mutable.HashSet[Int]()
    val num = 7
    val bset = sc.broadcast(set)
    val bNum = sc.broadcast(7)
    val listRDD = sc.parallelize(list)
    listRDD.map(x => {
        bset.value.+=(x)
        x * bNum.value
    }).foreach(x => print(x + " "))
    println("----------------------")
    for (s <- set) {
        println(s)
    }
    sc.stop()
    }
}

建立並使用廣播變數的過程如下:

在一個型別T的物件obj上使用SparkContext.brodcast(obj)方法,建立一個Broadcast[T]型別的廣播變數,obj必須滿足Serializable。
通過廣播變數的.value()方法訪問其值。
另外,廣播過程可能由於變數的序列化時間過程或者序列化變數的傳輸過程過程而成為瓶頸,而Spark Scala中使用的預設的Java序列化方法通常是低效的,因此可以通過spark.serializer屬性為不同的資料型別實現特定的序列化方法(如Kryo)來優化這一過程。

累加器

Spark提供的Accumulator,主要用於多個節點對一個變數進行共享性的操作。Accumulator只提供了累加的功能。但是確給我們提供了多個task對一個變數並行操作的功能。但是task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程式可以讀取Accumulator的值。

object AccumulatorApp {
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("AccumulatorApp").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val list = List(1, 2, 4, 6, 0, 9)
    val listRDD = sc.parallelize(list)
    val acc = sc.accumulator(0)
    list.map(x => {
        /**
          * 在這裡只能對累加器進行寫的操作,不能進行讀的操作
          * count-->action
          * 主要是可以替代直接使用count來統計某一個transformation執行的資料量,
          * 因為count是一個action,一旦執行了action操作,前面rdd partition中資料會被釋放掉
          * 這樣要想在進行其他的操作,就需要重新載入計算資料,會是spark程式效能降低
          */
        acc.add(1)
        (x, 1)
    })
    println("累加結果: " + acc.value)
    sc.stop()

 }
}