Spark(三)Spark RDD程式設計
目錄:
3、RDD程式設計
Spark對資料的核心抽象—彈性分散式資料集(Resilient Distributed Dataset,簡稱 RDD)。RDD其實就是分散式的元素集合,是一個容錯的、並行的資料結構,可以讓使用者顯示的將資料儲存到磁碟和記憶體中,並能控制資料的分割槽。
在Spark中,對資料的所有操作不外乎建立RDD、轉化已有RDD以及呼叫RDD操作進行求值。同時,Spark會自動將RDD中的資料分發到叢集上,並將操作並行化執行。
3.1、RDD基礎
Spark中的RDD就是一個不可變的分散式物件集合。每個RDD都被分為多個分割槽,這些分割槽執行在叢集中的不同節點上。RDD可以包含Python、Java、Scale中任意型別的物件,甚至可以包含使用者自定義的物件。
Spark提供了兩種方法建立RDD:讀取一個外部資料集,或在驅動器程式裡分發驅動器程式中的物件集合(比如list和set)。
RDD的特性:
- 它是不變的資料結構儲存
- 它是支援跨叢集的分散式資料結構
- 可以根據資料記錄的key對結構進行分割槽
- 提供了粗粒度的操作,且這些操作都支援分割槽
- 它將資料儲存在記憶體中,從而提供了低延遲性
Spark程式或shell會話步驟:
- 從外部資料創建出輸入RDD。
- 使用諸如filter()這樣的轉化操作對RDD進行轉化,以定義新的RDD。
- 告訴Spark對需要被重用的中間結果RDD執行persist()操作。
- 使用行動操作(例如count()和first()等)來觸發一次平行計算,Spark會對計算進行優化後再執行。
3.2、建立RDD
建立RDD最簡單的方式就是把程式中一個已有的集合傳給SparkContext的parallelize()方法。適用於開發原型和測試時,這種方式需要把你的整個資料集先放在一臺機器的記憶體中。
1、Java中的parallelize()方法:
JavaRDD<String> lines = sc.parallelize(Arrays.asList(“pandas, “I like pandas”));
更常用的方式是從外部儲存中讀取資料來建立 RDD。
2、Java中的textFile方法:
JavaRDD(String) lines = sc.textFile(“README.md”);
3.3、RDD操作
RDD支援兩種操作:轉化操作和行動操作。
RDD的轉化操作是返回一個新的RDD的操作,比如map()和filter(),而行動操作則是向驅動器程式返回結果或把結果寫入外部系統的操作,會觸發實際的計算,比如count()和first()。
其實,轉化操作返回的是RDD,而行動操作返回的是其他的資料型別。
3.3.1、轉化操作
RDD的轉化操作時返回新的RDD的操作。轉化出來的RDD是惰性求值的,只有在行動操作中用到這些RDD時才會被計算。許多轉化操作都是針對各個元素的,也就是說,這些轉化操作每次只會操作RDD中的一個元素。不過並不是所有的轉化操作都是這樣的。
例如:假定我們有一個日誌檔案log.txt,內含有若干訊息,希望選出其中的錯誤訊息。
1、用Scale實現filter()轉化操作:
val inputRDD = sc.textFile(“log.txt”)
val errorRDD = inputRDD.filter(line => line.contains(“error”))
2、用Java實現filter()轉化操作
JavaRDD<String> inputRDD = sc.textFile(“log.txt”);
JavaRDD<String> errorRDD = inputRDD.filter(new Function<String,Boolean>() {
@Override
public Boolean call(String x) {
return x.contains(“error”);
}
});
3、Java進行union()轉化操作
JavaRDD<String> badLinesRDD = errorRDD.union(inputRDD);
通過轉化操作,已經從RDD中派生出新的RDD,Spark會使用譜系圖(lineage graph)來記錄這些不同RDD之間的依賴關係。Spark需要用這些資訊來按需要計算每個RDD,也可以依靠譜系圖在持久化的RDD丟失部分資料時恢復所丟失的資料。
——日誌分析過程中創建出的RDD譜系圖
3.3.2、行動操作
有時我們還需要對資料集進行實際的計算。行動操作是第二種型別的RDD操作,他們會把最終求得的結果返回到驅動器程式,或者寫入外部儲存系統中。由於行動操作需要生成實際的輸出,他們會強制執行那些求值必須用到的RDD的轉換操作。
1、在Scale中使用行動操作對錯誤進行計數:
println("Input had "+ badLinesRDD.count() + " concerning lines")
println("Here are 10 examples: ")
badLines.take(10).foreach(println)
2、在Java中使用行動操作對錯誤進行計數:
System.out.println("Input had " + badLinesRDD.count() + "concerning lines");
System.out.println("Here are 10 examples: ")
for(String line : badLinesRDD.take(10)) {
System.out.println(line);
}
在這個例子中,我們在驅動器程式中使用take()獲取RDD中的少量元素。然後再本地遍歷這些元素,並把驅動器端打印出來。RDD還有一個collect()函式,可以用來獲取整個RDD中的資料。只有你的整個資料集能在單臺機器的記憶體中放得下時,才能使用collect(),因此,collect()不能應用在大規模的資料集上。
3.3.3、惰性求值
RDD的轉化操作時惰性求值的。這意味著當我們對RDD呼叫轉化操作時,操作是不會立即執行。相反,Spark 會在內部記錄下所要求執行的操作的相關資訊。我們不應該把 RDD 看作存放著特定資料的資料集,而最好把每個 RDD 當作我們通過轉化操作構建出來的、記錄如何計算資料的指令列表。
3.4、向Spark傳遞函式
Spark的大部分轉化操作和一部分行動操作,都需要依賴使用者傳遞的函式來計算。在 Java 中,函式需要作為實現了 Spark 的 org.apache.spark.api.java.function 包中的任一函式介面的物件來傳遞。根據不同的返回型別,我們定義了一些不同的介面。
函式名 |
實現的方法 |
用途 |
Function<T, R> |
R call(T) |
接收一個輸入值並返回一個輸出值,用於類似map()和filter()等操作中 |
Function2<T1, T2, R> |
R call(T1, T2) |
接收兩個輸入值並返回一個輸出值,用於類似aggregate()和fold()等操作 |
FlatMapFunction<T, R> |
Iterator<R> call(T) |
接收一個輸入值並返回任意輸出,用於類似flatMap()這樣的操作 |
可以把我們的函式類內聯定義為使用匿名內部類,也可以建立一個具名類。
1、使用匿名內部類進行函式傳遞
JavaRDD<String> errors = lines.filter(new Function<String, Boolean>() {
public Boolean call(String x) {
return x.contains("error");
}
});
2、使用具名類進行函式傳遞
class ContainsError implements Function<String, Boolean>() {
public Boolean call(String x) {
return x.contains("error");
}
}
JavaRDD<String> errors = lines.filter(new ContainsError());
3、帶引數的Java函式類
class Contains implements Function<String, Boolean>() {
private String query;
public Contains(String query) {
this.query = query;
}
public Boolean call(String x) {
return x.contains(query);
}
}
RDD<String> errors = lines.filter(new Contains("error"));
在Java8中可以使用lambda表示式來簡潔地實現函式介面。
4、使用Java8的lambda表示式進行函式傳遞
JavaRDD<String> errors = lines.filter(s -> s.contains("error"));
3.5、常見的轉化操作和行動操作
3.5.1、基本RDD
1、針對各個元素的轉化操作:
最常用的轉化操作時map()和filter()。 轉化操作map()接收一個函式,把這個函式用於RDD中的每個元素,將函式的返回結果作為結果RDD中對應元素的值。而轉化操作filter()接收一個函式,並將RDD中滿足該函式的元素放入新的RDD中返回。
我們可以使用map()來做各種各樣的事情:可以把我們的URL集合中的URL對應的主機名提取出來,也可以簡單到只對各個陣列求平方值。map()的返回值型別不需要和輸入型別一樣。這樣如果有一個字串RDD,並且我們的map()函式是用來把字串解析並返回一個Double值的,那麼此時我們的輸入RDD型別就是RDD[String],而輸出型別是RDD[Double]。
1)使用map()計算RDD中各值的平方:
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>(){
@Override
public Integer call(Integer x) throws Exception {
return x*x;
}
});
System.out.println(StringUtils.join(result.collect(),","));
2)使用flatMap()將行資料切分為單詞
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello word","hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
words.first(); //返回"hello"
3)RDD的flatMap()和map的區別
2、偽集合操作:
RDD中最常缺失的集合屬性是元素的唯一性,因為常常有重複的元素。如果只要唯一的元素,我們可以使用RDD.distinc()轉化操作來生成一個只包含不同元素的RDD。不過distinct()操作的開銷很大,因為它需要將所有的資料經過網路混洗(shuffle),以確保每個元素都只有一份。
最簡單的集合操作時union(other),它會返回一個包含兩個RDD中的所有元素的RDD。比如在處理來自多個數據源的日誌檔案很有用。與數學中的union()不同的是,如果輸入的RDD中有重複資料,Spark的uniion()操作也會包含這些重複資料。
Spark還提供了intersection(other)方法,只返回兩個RDD中都有的元素。intersection()在執行時也會去掉重複的元素(單個RDD內的重複元素也會一起移除)。intersection()需要通過網路混洗資料來發現共有的資料,因此效能吧union()要差很多。
substract(other)函式接收另一個RDD作為引數,返回一個由只存在於第一個RDD中而不存在於第二個RDD中的所有 元素組成的RDD。也需要資料混洗。
cartesian(other)轉化操作會返回所有可能的(a,b)對,其中a是源RDD中的元素,而b則來自於另一個RDD。用於計算兩個RDD的笛卡爾積。
表3-5-1:對一個數據為{1, 2, 3, 3}的RDD進行基本的RDD轉化操作
函式名 |
目的 |
示例 |
結果 |
map() |
將函式應用於RDD中的每個元素,將返回值構成新的RDD |
rdd.map(x => x + 1) |
{2, 3, 4, 4} |
flatMap() |
將函式應用於RDD中的每個元素,將返回的迭代器的所有內容構成新的RDD。通常用來切分單詞 |
rdd.flatMap(x => x.to(3) |
{1, 2, 3, 2, 3, 3, 3} |
filter() |
返回一個有通過傳給filter()的函式的元素組成的RDD |
rdd.filter(x => x != 1) |
{2, 3, 3} |
distinct() |
去重 |
rdd.distinct() |
{1, 2, 3} |
sample(withReplacement, fraction, [seed]) |
對RDD取樣,以及是否替換 |
rdd.sample(false, 0.5) |
非確定的 |
表3-5-2:對資料分別為{1, 2, 3}和{3, 4, 5}的RDD進行鍼對兩個RDD的轉化操作
函式名 |
目的 |
示例 |
結果 |
union() |
生成一個包含兩個RDD中的所有元素的RDD |
rdd.union(other) |
{1, 2, 3, 3, 4, 5} |
intersection() |
求兩個RDD共同的元素的RDD |
rdd.intersection(other) |
{3} |
subtract() |
移除一個RDD中的內容(例如移除訓練資料) |
rdd.subtract(other) |
{1, 2} |
cartesian() |
與另一個RDD的笛卡爾積 |
rdd.cartesian(other) |
{(1, 3) ,(1, 4), …(3, 5)} |
3、行動操作:
最常見的行動操作reduce()。它接收一個函式作為引數,這個函式要操作兩個RDD的元素型別的資料並返回一個同樣型別的新元素。
1) Java中的reduce()函式
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer x, Integer y) throws Exception {
return x + y;
}
});
2) Java中的aggregate()函式
表3-5-3:對資料為{1, 2, 3, 3}的RDD進行基本的RDD操作
3.5.2、在不同RDD型別間轉換
在Java中,各種RDD的特殊型別間的轉換更為明確。Java中有兩個專門的類JavaDoubleRDD和JavaPairRDD ,來處理特殊型別的RDD。要構建出這些特殊型別的RDD,需要使用特殊版本的類來替代一般使用的Function類。如果要從T型別的RDD創建出一個DoubleRDD,我們就應當在對映操作中使用DoubleFunction<T>來替代Function<T, Double>。
此外,我們也需要呼叫RDD上的一些別的函式(因此不能只是創建出一個DoubleFunction然後把它傳給map())。當需要一個DoubleRDD時,我們應當呼叫mapToDouble()來替代map()。
表3-5-4:Java中針對專門型別的函式介面
函式名 |
等價函式 |
用途 |
DoubleFlatMapFunction<T> |
Function<T, Iterable<Double> |
用於flatMapToDouble,以生成DoubleRDD |
DoubleFunction<T> |
Function<T, Double> |
用於mapToDouble,以生成DoubleRDD |
PairFlatMapFunction<T, K, V> |
Function<T, Iterable<Tuple2<K, V>>> |
用於flatMapToPair,以生成PairRDD<K , V> |
PairFunction<T, K, V> |
function<T, Tuple2<K, V>> |
用於mapPair,以生成PairRDD<K ,V> |
我們可以把<3.5.1中1) 使用map()計算RDD中各值的平方》修改為生成一個JavaDoubleRDD,計算RDD中每個元素的平方的示例。
用Java建立DoubleRDD:
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4));
JavaDoubleRDD result = rdd.mapToDouble(new DoubleFunction<Integer>(){
@Override
public double call(Integer x) throws Exception {
return (double) x * x;
}
});
System.out.println("每個元素平方後的值:" + StringUtils.join(result.collect(),",")); //1.0,4.0,9.0,16.0
System.out.println("每個元素平方後的平均值:" + result.mean()); // 7.5
3.6、持久化(快取)
3.6.1、SparkRDD持久化特點
Spark最重要的一個功能,就是在不同操作間,持久化(或快取)一個數據集在記憶體中。當你持久化一個RDD,每一個結點都將把它的計算分塊結果儲存在記憶體中,並在對此資料集(或者衍生出的資料集)進行的其它動作中重用。這將使得後續的動作(action)變得更加迅速(通常快10倍)。快取是用Spark構建迭代演算法的關鍵。RDD的快取能夠在第一次計算完成後,將計算結果儲存到記憶體、本地檔案系統或者Tachyon(分散式記憶體檔案系統)中。通過快取,Spark避免了RDD上的重複計算,能夠極大地提升計算速度。
SparkRDD是惰性求值的,而有時需要多次使用同一個RDD。為了避免多次計算同一個RDD,可以讓Spark對資料進行持久化。當我們讓Spark持久化儲存一個RDD時,計算出RDD的節點會分別儲存他們所求出的分割槽資料。如果有一個持久化資料的節點故障,Spark會在需要用到快取的資料時重新計算丟失的資料分割槽。
3.6.2、如何持久化
Spark通過persist()或cache()方法可以標記一個要被持久化的RDD,一旦首次被觸發,該RDD將會被保留在計算節點的記憶體中並重用。實際上cache()是使用persist()的快捷方法。
首先,在action中計算得到rdd;然後,將其儲存在每個節點的記憶體中。Spark的快取是一個容錯的技術,如果RDD的任何一個分割槽丟失,它可以通過原有的轉換(transformations)操作自動的重複計算並且創建出這個分割槽。
此外,我們可以利用不同的儲存級別儲存每一個被持久化的RDD。例如,它允許我們持久化集合到磁碟上、將集合作為序列化的Java物件持久化到記憶體中、在節點間複製集合或者儲存集合到Tachyon中。我們可以通過傳遞一個StorageLevel物件給persist()方法設定這些儲存級別。cache()方法使用了預設的儲存級別—StorageLevel.MEMORY_ONLY。完整的儲存級別介紹如下圖。
——持久化資料級別
StorageLevel 原始碼:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
說明:上面"_2"代表的是份數,就是把持久化的資料存為2份。
StorageLevel有五個屬性分別是:
private var _useDisk: Boolean, //useDisk_是否使用磁碟
private var _useMemory: Boolean, //useMemory_是否使用記憶體
private var _useOffHeap: Boolean, //useOffHeap_是否使用堆外記憶體如:Tachyon,
private var _deserialized: Boolean,//deserialized_是否進行反序列化
private var _replication: Int = 1) //replication_備份數目。
Spark也會自動持久化(使用者沒有主動呼叫persist
)一些Shuffle過程中的中間資料,這樣做是為了避免在Shuffle期間節點失敗後重新計算整個輸入。所以建議呼叫persist
,如果需要重用RDD的結果。
3.6.3、儲存級別的選擇
Spark的儲存級別旨在提供記憶體使用和CPU效率之間的不同權衡,因此建議通過以下過程來選擇一個:
- 如果你的RDDs適合預設的儲存級別,則不用管。這是CPU效率最高的選項,允許RDD上的操作儘可能快地執行;
- 如果不適合預設的儲存級別,那麼嘗試使用MEMORY_ONLY_SER並選擇一個快速的序列化庫來使物件更加節省空間,但仍然能夠快速訪問。
- 儘量不要溢位資料到磁碟,除非對資料集計算的消耗非常大,或者對資料集進行了大規模的過濾。否則,重新計算分割槽就可能與從磁碟讀取分割槽一樣快了。
- 如果想要快速故障恢復,則使用副本儲存級別。所有的儲存級別都通過重新計算丟失的資料來提供完整的容錯能力,但是副本儲存級別讓你可以繼續在RDD上執行任務,而不用等待重新計算丟失的分割槽。
如下圖:
注意只能設定一種:不然會拋異常: Cannot change storage level of an RDD after it was already assigned a level
3.6.4、儲存級別的選擇
1、呼叫rdd.persist();變數可以這樣設定 如:rdd.persist(StorageLevel.MEMORY_ONLY); 這裡使用了MEMORY_ONLY級別儲存。當然也可以選擇其他的如: rdd.persist(StorageLevel.DISK_ONLY());
2、呼叫rdd.cache()方法,cache()是rdd.persist(StorageLevel.MEMORY_ONLY)的簡寫,效果和他一模一樣的。
3、呼叫rdd.unpersist()清除快取
Demo:
public class SparkCacheDemo {
private static JavaSparkContext sc;
public static void main(String[] args) {
List list = Arrays.asList(5, 4, 3, 2, 1, 6, 9);
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkCacheDemo");
sc = new JavaSparkContext(conf);
JavaRDD rdd = sc.parallelize(list);
// rdd.persist(StorageLevel.DISK_ONLY()); //磁碟儲存
rdd.persist(StorageLevel.MEMORY_ONLY());//記憶體
// rdd.persist(StorageLevel.MEMORY_ONLY_2()); //記憶體儲存兩份
rdd.collect();
rdd.collect(); //這裡可以設定debug斷點便於檢視
rdd.unpersist(); //清除快取
rdd.collect(); //這裡也可以設定debug斷點便於檢視
}
}
啟動後設置上面連個debug點 然後檢視頁面 http://127.0.0.1:4040/storage/ 可以看到相關資訊 如下圖:
磁碟:
記憶體:
3.6.5、移除資料
Spark會自動監控每個節點的快取使用,並使用LRU(least-recently-used)策略刪除舊的分割槽資料。也可以使用rdd.unpersist()
來手動移除資料。