Spark——RDD操作詳解
一、基本RDD
1、針對各個元素的轉化操作
最常用的轉化操作是map()和filter()。轉化操作map()J接收一個函式,把這個函式用於RDD中的每一個元素,將函式的返回結果作為結果RDD中對應元素。而轉化操作filter()則接收一個函式,將RDD滿足該函式的元素放入新的RDD中返回。map()的返回值型別不需要和輸入型別一樣。
從一個RDD變成另外一個RDD。lazy,懶執行 。比如根據謂詞匹配篩選資料就是一個轉換操作。
例:求平均值
Scala:
val input=sc.parallelize(List(1,2,3,4))
val result=input.map(x=>x *x)
println(result.collect().mkString(","))
java:
@Test
public void rddSquare(){
SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount");
JavaSparkContext sc = new JavaSparkContext( sparkConf);
JavaRDD<Integer> rdd= sc.parallelize(Arrays. asList(1,2,3,4));
JavaRDD< Integer> result= rdd.map( new Function<Integer, Integer>() {
@Override
public Integer call(Integer x ) throws Exception {
return x *x ;
}
});
System. err.println(StringUtils.join( result.collect(), ""));
}
flatMap()方法可以實現對每個輸入元素生成多個輸出元素,返回一個返回值序列的迭代器。其一個簡單用途就是把輸入的字串切分為單詞。
Scala:
val lines=sc.parallelize(List("hello word","hi","I'm back"))
val words=lines.flatMap(line=>line.split(" "))
words..first()
Java:
@Test
public void rddFlatMap(){
JavaRDD<String> lines= sc.parallelize(Arrays.asList( "hello word","hi" ,"i am back"));
JavaRDD<String> words= lines.flatMap( new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line ) throws Exception {
return Arrays.asList( line.split( " ")).iterator();
}
});
System. err.println(words .first());
}
flatMap()和map()方法的區別:flatMap()相當於看作返回來的迭代器的“壓扁”,這樣就得到一個由各個列表中的元素組成的RDD。
例如:
map()的結果:{[“coffe”,”panda”],[“happy”,”panda”],[“happies”,”panda”,”party”]}
flatMap()的結果:{“coffe”,”panda” ,”happy”,”panda” ,”happies”,”panda”,”party” }
filter()操作不會改變已有的inputRDD中的資料。
通過轉化操作,從已有的RDD中派生出新的RDD,spark會使用譜系圖來記錄這些不同RDD之間的依賴關係。spark需要用這些資訊來按需計算每個RDD,也可以依靠譜系圖在持久化的RDD丟失部分資料時恢復所丟失的資料。
Scala:
val inputRDD=sc.textFile("log.txt")
val errorRDD=inputRDD.filter(line=>line.contains("error"))
Java:
JavaRDD<String> inputRDD=sc.textFile("log.txt");
JavaRDD<String> errorRDD=inputRDD.filter(
new Function<String,Boolean>(){
public Boolean call(String x){
return x.contains("error");
}
}
);
filter()操作不會改變已有的inputRDD中的資料。
通過轉化操作,從已有的RDD中派生出新的RDD,spark會使用譜系圖來記錄這些不同RDD之間的依賴關係。spark需要用這些資訊來按需計算每個RDD,也可以依靠譜系圖在持久化的RDD丟失部分資料時恢復所丟失的資料。
2、偽集合操作
RDD.distinct()方法轉換操作生成一個只包含不同元素的一個新的RDD。開銷很大。
RDD.union(otherRDD),會返回一個包含兩個RDD中所有元素的RDD,包含重複資料。
RDD.intersection(otherRDD),只返回兩個RDD中都有的元素。可能會去掉所有的重複元素。通過網路混洗來發現共有元素。
RDD.subtract(otherRDD)返回只存在第一個RDD中而不存在第二個RDD中的所有的元素組成的RDD。也需要網路混洗。
RDD.cartesian(otherRDD),計算兩個RDD的笛卡爾積,轉化操作會返回所有可能的(a,b)對,其中a是源RDD中的元素,而b則來自於另一個RDD。
對一個數據為{1,2,3,3}的RDD進行操作進行基本的RDD轉化操作
3、行動操作
RDD最常見的行動操作:reduce()操作,它接受一個函式作為引數,這個函式要操作兩個相同型別的RDD資料並返回一個同樣型別的新元素。
reduce將RDD中元素兩兩傳遞給輸入函式,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函式直到最後只有一個值為止。
行動操作會對RDD計算一個結果,並把結果返回到驅動程式中,或把結果儲存到外部儲存系統中(如HDFS)中。
Scala:
val rdd=sc.parallelize(List(1,2,3,3))
val sum=rdd.reduce((x,y)=>x+y)
Java:
/**
* java中的reduce()方法
*/
public void testReduce(){
JavaRDD<Integer> rdd= sc.parallelize(Arrays.asList(1,2,3,3));
Integer sum= rdd.reduce( new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer x , Integer y ) throws Exception {
return x +y ;
}
});
System. err.println(sum );
}
flod()方法與reduce()方法類似,接受一個與redce()接受的函式相同的函式,再加上一個“初始值”來作為分割槽第一次呼叫時的結果。
兩者都要求函式的返回值型別需要和我們所操作的RDD中的元素型別相同。
aggregate()函式則把我們從返回值型別必須與所操作的RDD型別相同的限制中解放出來。可以計算兩個RDD的平均值。
Scala:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
val result=rdd.aggregate((0,0))((acc,value)=>(acc._1+value,acc._2+1),(acc1,acc2)=>(acc1._1+acc2._1,acc1._2+acc2._2))
val avg=result._1/result._2.toDouble
Java:
public class AvgCount implements Serializable {
private static final long serialVersionUID = 1L;
private final static SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount" );
private final static JavaSparkContext sc = new JavaSparkContext( sparkConf);
public int total ;
public int num ;
public AvgCount(int total , int num) {
this. total = total;
this. num = num;
}
public double avg(){
return total /(double)num;
}
static Function2<AvgCount,Integer,AvgCount> addAndCount=
new Function2<AvgCount, Integer, AvgCount>() {
@Override
public AvgCount call(AvgCount a , Integer x ) throws Exception {
a. total+= x;
a. num+=1;
return a ;
}
};
static Function2<AvgCount, AvgCount, AvgCount> combine=
new Function2<AvgCount, AvgCount, AvgCount>() {
@Override
public AvgCount call(AvgCount a , AvgCount b ) throws Exception {
a. total+= b. total;
a. num+= b. num;
return a ;
}
};
public static void main(String[] args) {
AvgCount initial= new AvgCount(0, 0);
JavaRDD<Integer> rdd= sc.parallelize(Arrays.asList(1,2,3,4,5,6));
AvgCount result= rdd.aggregate( initial, addAndCount, combine );
System. err.println(result .avg());
}
}
collect()方法會返回整個RDD的內容。測試中使用。RDD內容不多。
take(n)返回RDD的第n個元素。並且嘗試只訪問儘量少的分割槽,因此該操作會得到一個不均衡的集合。可能返回的元素會跟預期的不太一樣。
top()按照RDD元素的順序,返回RDD的前幾個元素。
first()就是一個行動操作,他會返回RDD的第一個元素。
觸發計算,進行實際的資料處理
Scala:
print "input had "+badLinesRDD.count()+" concering lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10).foreach(println)
Java:
System.out.println("input had "+badLinesRDD.count()+" concering lines" );
System.out.print("Here are 10 examples:" );
for(Sring line:badLinesRDD.take(10)){
System.out.println(line);
}
對一個數據為{1,2,3,3}的RDD進行基本的RDD行動操作如表:。
兩者的區別在於Spark計算RDD的方式不同。雖然你可以在任何時候去定義新的RDD,但Spark只會惰性計算這些RDD,他們只有在第一次在一個行動操作中用到時,才會真正計算。
二、在不同RDD型別間轉換
在Scala中將RDD轉為特定函式的RDD是由隱式轉換自動處理的。需要加上import org.apache.spark.SparkContext._來使用在這些隱式轉換。這些隱式轉換可以隱式的將一個RDD轉換為各種封裝,比如DoubleRDDFunctions(數值資料的RDD)和PairRDDFunction(鍵值對RDD)。
在Java中有兩個專門的類JavaDoubleRDD和JavaPairRDD,來處理特殊型別的RDD。
Java中針對專門型別的函式介面:
/**
* java建立DoubleRDD
* @author Administrator
*
*/
public class DoubleRDD implements Serializable {
private static final long serialVersionUID = 1L;
private final static SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount" );
private final static JavaSparkContext sc = new JavaSparkContext( sparkConf);
public void testDoubleRDD(){
JavaRDD<Integer> rdd= sc.parallelize(Arrays.asList(1,2,3,4,5));
JavaDoubleRDD result= rdd.mapToDouble( new DoubleFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public double call(Integer x) throws Exception {
return (double )x *x ;
}
});
System. err.println(result );
}
}
三、持久化(快取)
當我們讓Spark持久化儲存一個RDD時,計算出RDD的節點會分別儲存他們所有求出的分割槽資料。如果一個有持久化資料的節點發生故障,spark會在需要用到的快取資料時重算丟失的資料分割槽。可以把資料備份到多個節點上。
在scala和java中,預設情況下persist()會把資料以序列化的形式快取到JVM的堆空間中。
org.apache.spark.storage.StorageLevel和py.StorageLevel中的持久化級別;如有必要可以通過在儲存級別的末尾加上”_2”來把持久化資料存為兩份:
在Scala中使用persist();
import org.apache.spark.storage.StorageLevel
val result=input.map(x=>x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
在第一次對這個RDD呼叫行動操作前就呼叫了persist()方法,persist()呼叫本身不會觸發強制求值。
如果快取的資料太多,記憶體中放不下,Spark會自動利用最近最少使用(LRU)的快取策略把最老的分割槽從記憶體中移除。當然對於使用記憶體和磁碟快取級別的分割槽來說,移除的資料會寫如磁碟。
最後,還可以使用unpersist()方法手動把持久化的RDD從快取中移除。
cache()方法,是延遲執行,需要在一個action執行之後,進行快取RDD。是persist特殊快取方式。將RDD放入記憶體中,快取級別是MEMORY_ONLY