Spark之持久化與儲存級別
一、持久化cache()與persist()
根據spark官方文件,Spark 中一個很重要的能力是將資料持久化(或稱為快取),在多個操作間都可以訪問這些持久化的資料。當持久化一個 RDD 時,每個節點的其它分割槽都可以使用 RDD 在記憶體中進行計算,在該資料上的其他 action 操作將直接使用記憶體中的資料。這樣會讓以後的 action 操作計算速度加快(通常執行速度會加速 10 倍)。快取是迭代演算法和快速的互動式使用的重要工具。
RDD 可以使用 persist() 方法或 cache() 方法進行持久化。資料將會在第一次 action 操作時進行計算,並快取在節點的記憶體中,這裡可以看出cache()是lazy的,但是在spark sql中,cache是立即執行的,是非lazy的,這裡需要注意
在 shuffle 操作中(例如 reduceByKey),即便是使用者沒有呼叫 persist 方法,Spark 也會自動快取部分中間資料。這麼做的目的是,在 shuffle 的過程中某個節點執行失敗時,不需要重新計算所有的輸入資料。如果使用者想多次使用某個 RDD,強烈推薦在該 RDD 上呼叫 persist 方法。
差別:
根據spark原始碼,cache方法呼叫的是persist()無參,而persist()呼叫的是它下面的persist(StorageLevel.MEMORY_ONLY)方法。預設的儲存級別為MEMORY_ONLY。cache()只有這個儲存級別,是persist()的一個簡化應用。後面將會接收儲存級別。
cache()原始碼:
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
persist()原始碼:
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
建立快取
在spark-shell中,執行如下命令
val line = sc.textFile("file:///home/hadoop/test/test.txt")
line.cache
line.count //觸發action
然後到spark web頁面檢視,可以在Storage中看到快取資訊。
再次注意:cache()是lazy操作,必須觸發action操作才能執行。
刪除快取
Spark 自動監控各個節點上的快取使用率,並以最近最少使用的方式(LRU)將舊資料塊移除記憶體。如果想手動移除一個 RDD,而不是等待該 RDD 被 Spark 自動移除,可以使用 RDD.unpersist() 方法
line.unpersist()
二、StorageLevel儲存級別
這是快取的儲存級別,快取可以存到記憶體,磁碟,也可以儲存兩份。就是根據StorageLevel指定的。
原始碼:
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}
從上面可以看到,class StorageLevel 有5個引數,
1、_useDisk: Boolean 是否使用磁碟
2、_useMemory: Boolean 是否使用記憶體
3、_useOffHeap: Boolean 是否使用堆外存
4、_deserialized: Boolean 是否使用反序列化,其逆過程式列化(Serialization)是java提供的一種機制,將物件表示成一連串的位元組;而反序列化就表示將位元組恢復為物件的過程。序列化是物件永久化的一種機制,可以將物件及其屬性儲存起來,並能在反序列化後直接恢復這個物件
5、_replication: Int = 1 副本個數,這裡有預設引數為1
然後object StorageLevel就是根據class StorageLevel傳入Boolean指定這5個引數。persist()預設使用
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
三、如何選擇儲存級別
Spark 的儲存級別的選擇,核心問題是在記憶體使用率和 CPU 效率之間進行權衡。建議按下面的過程進行儲存級別的選擇 :
1、如果使用預設的儲存級別(MEMORY_ONLY),儲存在記憶體中的 RDD 沒有發生溢位,那麼就選擇預設的儲存級別。預設儲存級別可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度執行。
2、如果記憶體不能全部儲存 RDD,那麼使用 MEMORY_ONLY_SER,並挑選一個快速序列化庫將物件序列化,以節省記憶體空間。使用這種儲存級別,計算速度仍然很快。
3、除了在計算該資料集的代價特別高,或者在需要過濾大量資料的情況下,儘量不要將溢位的資料儲存到磁碟。因為,重新計算這個資料分割槽的耗時與從磁碟讀取這些資料的耗時差不多。
4、如果想快速還原故障,建議使用多副本儲存級別(例如,使用 Spark 作為 web 應用的後臺服務,在服務出故障時需要快速恢復的場景下)。所有的儲存級別都通過重新計算丟失的資料的方式,提供了完全容錯機制。但是多副本級別在發生資料丟失時,不需要重新計算對應的資料庫,可以讓任務繼續執行。