1. 程式人生 > >Spark- RDD持久化

Spark- RDD持久化

 

官方原文:

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes, or store it off-heap in Tachyon. These levels are set by passing aStorageLevel

 object (ScalaJavaPython) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. Please refer to this page for the suggested version pairings.

Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level.

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

Which Storage Level to Choose?

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.

  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.

  • In environments with high amounts of memory or multiple applications, the experimental OFF_HEAP mode has several advantages:

    • It allows multiple executors to share the same pool of memory in Tachyon.
    • It significantly reduces garbage collection costs.
    • Cached data is not lost if individual executors crash.

 

RDD永續性

Spark中最重要的功能之一是跨操作在記憶體中持久化(或快取)資料集。當您持久儲存RDD時,每個節點都會儲存它在記憶體中計算的任何分割槽,並在該資料集(或從中派生的資料集)的其他操作中重用它們。這使得未來的行動更快(通常超過10倍)。快取是迭代演算法和快速互動使用的關鍵工具。

您可以使用persist()cache()方法標記要保留的RDD 第一次在動作中計算它時,它將保留在節點的記憶體中。Spark的快取是容錯的 - 如果RDD的任何分割槽丟失,它將使用最初建立它的轉換自動重新計算。

此外,每個持久化RDD可以使用不同的儲存級別進行儲存,例如,允許您將資料集保留在磁碟上,將其保留在記憶體中,但作為序列化Java物件(以節省空間),跨節點複製或儲存它在Tachyon堆滿了通過傳遞StorageLevel物件(Scala, Java, Python)來設定這些級別 persist()cache()方法是使用預設儲存級別的簡寫,即StorageLevel.MEMORY_ONLY(在記憶體中儲存反序列化的物件)。完整的儲存級別是:

儲存級別 含義
MEMORY_ONLY 將RDD儲存為JVM中的反序列化Java物件。如果RDD不適合記憶體,則某些分割槽將不會被快取,並且每次需要時都會重新計算。這是預設級別。
MEMORY_AND_DISK 將RDD儲存為JVM中的反序列化Java物件。如果RDD不適合記憶體,請儲存不適合磁碟的分割槽,並在需要時從那裡讀取它們。
MEMORY_ONLY_SER 將RDD儲存為序列化 Java物件(每個分割槽一個位元組陣列)。這通常比反序列化物件更節省空間,特別是在使用快速序列化器時,但讀取CPU密集程度更高。
MEMORY_AND_DISK_SER 與MEMORY_ONLY_SER類似,但將不適合記憶體的分割槽溢位到磁碟,而不是每次需要時即時重新計算它們。
DISK_ONLY 僅將RDD分割槽儲存在磁碟上。
MEMORY_ONLY_2,MEMORY_AND_DISK_2等 與上面的級別相同,但複製兩個群集節點上的每個分割槽。
OFF_HEAP(實驗性) Tachyon中以序列化格式儲存RDD 與MEMORY_ONLY_SER相比,OFF_HEAP減少了垃圾收集開銷,並允許執行器更小並共享記憶體池,使其在具有大堆或多個併發應用程式的環境中具有吸引力。此外,由於RDD駐留在Tachyon中,執行程式的崩潰不會導致丟失記憶體快取。在這種模式下,Tachyon中的記憶體是可丟棄的。因此,Tachyon不會嘗試重建一個從記憶中驅逐的塊。如果您打算使用Tachyon作為off堆儲存,Spark可以與開箱即用的Tachyon相容。 有關建議的版本配對,請參閱此頁面

注意: 在Python中,儲存物件將始終使用Pickle進行序列化,因此選擇序列化級別無關緊要。

reduceByKey即使沒有使用者呼叫,Spark也會在shuffle操作(例如)中自動保留一些中間資料persist這樣做是為了避免在shuffle期間節點出現故障時重新計算整個輸入。我們仍然建議使用者persist在計劃重用RDD時呼叫生成的RDD。

選擇哪種儲存級別?

Spark的儲存級別旨在提供記憶體使用和CPU效率之間的不同折衷。我們建議您通過以下流程選擇一個:

  • 如果您的RDD與預設儲存級別(MEMORY_ONLY)保持一致,請保持這種狀態。這是CPU效率最高的選項,允許RDD上的操作儘可能快地執行。

  • 如果沒有,請嘗試使用MEMORY_ONLY_SER選擇快速序列化庫,以使物件更加節省空間,但仍然可以快速訪問。

  • 除非計算資料集的函式很昂貴,否則它們不會溢位到磁碟,或者它們會過濾大量資料。否則,重新計算分割槽可能與從磁碟讀取分割槽一樣快。

  • 如果要快速故障恢復,請使用複製的儲存級別(例如,如果使用Spark來處理來自Web應用程式的請求)。所有儲存級別通過重新計算丟失的資料提供完全容錯,但複製的儲存級別允許您繼續在RDD上執行任務,而無需等待重新計算丟失的分割槽。

  • 在具有大量記憶體或多個應用程式的環境中,實驗OFF_HEAP 模式有幾個優點:

    • 它允許多個執行程式在Tachyon中共享相同的記憶體池。
    • 它顯著降低了垃圾收整合本。
    • 如果個別執行程式崩潰,快取資料不會丟失。

 

package cn.rzlee.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * @Author ^_^
 * @Create 2018/11/3
 */
public class Persist {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("Persist").setMaster("local[2]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile("C:\\Users\\txdyl\\Desktop\\log\\in\\data.txt", 1).cache();
        long beginTime = System.currentTimeMillis();
        long count = lines.count();
        System.out.println(count);
        long endTime = System.currentTimeMillis();
        System.out.println("cost "+(endTime - beginTime) + "millisecond");


        beginTime = System.currentTimeMillis();
        count = lines.count();
        System.out.println(count);
        endTime = System.currentTimeMillis();
        System.out.println("cost "+(endTime - beginTime) + "millisecond");

        sc.close();
    }
}