1. 程式人生 > >spark中的rdd的持久化

spark中的rdd的持久化

rdd的全稱為Resilient Distributed Datasets(彈性分散式資料集)

rdd的操作有兩種transfrom和action。

transfrom並不引發真正的rdd計算,action才會引發真正的rdd計算。

rdd的持久化是便於rdd計算的重複使用。

官方的api說明如下:

persist(storageLevel=StorageLevel(FalseTrueFalseFalse1))

Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY_SER

)

在rdd參與第一次計算後,設定rdd的儲存級別可以保持rdd計算後的值在記憶體中。(1)
另外,只有未曾設定儲存級別的rdd才能設定儲存級別,設定了儲存級別的rdd不能修改其儲存級別。(2)
(1)的舉例如下:
rdd1要經過transform1得到rdd2,然後在一個迴圈L內rdd2進行transform2和action1。
由於trasform操作是不會真正執行的,所以rdd1執行transform1需要在迴圈L第一次迴圈的時候觸發。
如果設定了rdd1的儲存級別,那麼迴圈L的第二次迴圈起,只需要從rdd2開始計算就好了,而不用向第一次迴圈時從rdd1開始計算。
rdd的持久化操作有cache()和presist()函式這兩種方式。

----------------------------------------------------------------------------------------------------------------------

Spark最重要的一個功能,就是在不同操作間,持久化(或快取)一個數據集在記憶體中。當你持久化一個RDD,每一個結點都將把它的計算分塊結果儲存在記憶體中,並在對此資料集(或者衍生出的資料集)進行的其它動作中重用。這將使得後續的動作(Actions)變得更加迅速(通常快10倍)。快取是用Spark構建迭代演算法的關鍵。
你可以用persist()或cache()方法來標記一個要被持久化的RDD,然後一旦首次被一個動作(Action)觸發計算,它將會被保留在計算結點的記憶體中並重用。Cache有容錯機制,如果RDD的任一分割槽丟失了,通過使用原先建立它的轉換操作,它將會被自動重算(不需要全部重算,只計算丟失的部分)。當需要刪除被持久化的RDD,可以用unpersistRDD()來完成該工作。
此外,每一個RDD都可以用不同的儲存級別進行儲存,從而允許你持久化資料集在硬碟,或者在記憶體作為序列化的Java物件(節省空間),甚至於跨結點複製。這些等級選擇,是通過將一個org.apache.spark.storage.StorageLevel物件傳遞給persist()方法進行確定。cache()方法是使用預設儲存級別的快捷方法,也就是StorageLevel.MEMORY_ONLY(將反序列化的物件存入記憶體)。
StorageLevel有五個屬性,分別是:useDisk_是否使用磁碟,useMemory_是否使用記憶體,useOffHeap_是否使用堆外記憶體如:Tachyon,deserialized_是否進行反序列化,replication_備份數目。
完整的可選儲存級別如下:

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. Please refer to this page for the suggested version pairings.
儲存級別的選擇
Spark的不同儲存級別,旨在滿足記憶體使用和CPU效率權衡上的不同需求。我們建議通過以下的步驟來進行選擇:
•如果你的RDDs可以很好的與預設的儲存級別(MEMORY_ONLY)契合,就不需要做任何修改了。這已經是CPU使用效率最高的選項,它使得RDDs的操作儘可能的快。•如果不行,試著使用MEMORY_ONLY_SER並且選擇一個快速序列化的庫使得物件在有比較高的空間使用率的情況下,依然可以較快被訪問。

儘可能不要儲存到硬碟上,除非計算資料集的函式,計算量特別大,或者它們過濾
了大量的資料。否則,重新計算一個分割槽的速度,和與從硬碟中讀取基本差不多快。
總結:呼叫persist()或cache()方法使用的是MEMORY_ONLY儲存級別,對於廣播變數,使用的是MEMORY_AND_DISK儲存級別。如果想使用其他儲存級別,可以呼叫persist(StroageLevel)。MEMORY_AND_DISK儲存級別時當記憶體足夠時直接儲存到記憶體佇列中,當記憶體不足時,將釋放掉不屬於同一個RDD的block的記憶體。