Spark常用的transformation運算元
1.map 和 mapPartitions
map的輸入變換函式應用於RDD中所有元素,而mapPartitions應用於所有分割槽。區別於mapPartitions主要在於呼叫粒度不同。
mapPartition可以倒過來理解,先partition,再把每個partition進行map函式,
適用場景:
如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的多。
val numbers: RDD[Int] = sc.parallelize(seqs,3) //map numbers.map(x => { println("AAA")//列印6次 x * 3 }).collect().foreach(println(_)) /** * 遍歷分割槽(3個) */ numbers.mapPartitions(par => { println("aaa")//列印3次 par.map(p => p * 3) }).collect().foreach(println(_))
2.filter
過濾操作,滿足filter內function函式為true的RDD內所有元素組成一個新的資料集
val seqs = Seq(1,2,3,4,5,6)
//4,5,6
seqs.filter(x=> x > 3).foreach(println(_))
3.flatMap
map是對RDD元素逐一進行函式操作對映為另外一個RDD,
而flatMap操作是將函式應用於RDD之中的每一個元素,將
返回迭代器的所有內容構成的新的RDD。
flatMap和Map區別在於map為“對映”,而flatMap則是“先對映,後扁平化”。
val seqs = Array("aaa AAA","bbb BBB","ccc CCC","ddd DDD") val numbers = sc.parallelize(seqs) scala> numbers.map(x => x.split(" ")).collect() res1: Array[Array[String]] = Array(Array(aaa, AAA), Array(bbb, BBB), Array(ccc, CCC), Array(ddd, DDD)) scala> numbers.flatMap(x=>x.split(" ")).collect() res2: Array[String] = Array(aaa, AAA, bbb, BBB, ccc, CCC, ddd, DDD)
4.mapPartitionsWithIndex
與mapPartitions類似,但需要提供一個表示分割槽索引值的整型值作為引數,因此function必須是(int, Iterator\<T\>)=>Iterator\<U\>型別的。
//統計鍵值對中的各個分割槽的元素 val rdd = sc.parallelize(List((1,1), (1,2), (2,3), (2,4), (3,5), (3,6),(4,7), (4,8),(5,9), (5,10)),3) def mapPartIndexFunc(i1:Int,iter: Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={ var res = List[(Int,(Int,Int))]() while(iter.hasNext){ var next = iter.next() res=res.::(i1,next) } res.iterator } val mapPartIndexRDD = rdd.mapPartitionsWithIndex(mapPartIndexFunc) mapPartIndexRDD.foreach(println(_)) //計算結果 (0,(1,1)) (0,(1,2)) (0,(2,3)) (1,(2,4)) (1,(3,5)) (1,(3,6)) (2,(4,7)) (2,(4,8)) (2,(5,9)) (2,(5,10))
5.sample(withReplacement,fraction,seed)
以指定的隨機種子隨機抽樣出數量為fraction的資料,withReplacement表示是抽出的資料是否放回,true為有放回的抽樣,false為無放回的抽樣
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> rdd.sample(true,0.57,5).collect
res10: Array[Int] = Array(8, 8, 8, 9)
6.union(並集)
合併,它只是將rdd1和rdd2在邏輯上合併,並不會進行資料的合併以傳輸,不去重
scala>var rdd1 = sc.parallelize(List("aa","aa","bb","cc","dd"));
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala>var rdd2 = sc.parallelize(List("aa","dd","ff"));
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala>rdd1.union(rdd2).collect();
res3: Array[String] = Array(aa, aa, bb, cc, dd, aa, dd, ff)
7.intersection
RDD1.intersection(RDD2),返回兩個RDD的交集,並且去重
intersection需要混洗資料,比較浪費效能
scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd"))
RDD1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> var RDD2 = sc.parallelize(List("aa","dd","ff"))
RDD2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:24
scala> RDD1.intersection(RDD2).collect
res5: Array[String] = Array(aa, dd)
8.distinct
distinct用於去重, 我們生成的RDD可能有重複的元素,使用distinct方法可以去掉重複的元素, 不過此方法涉及到混洗,操作開銷很大
scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd"))
RDD1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> RDD1.collect
res4: Array[String] = Array(aa, aa, bb, cc, dd)
scala> val distinctRDD = RDD1.distinct.collect
distinctRDD: Array[String] = Array(aa, dd, bb, cc)
9.groupByKey
groupByKey會將RDD[key,value] 按照相同的key進行分組,形成RDD[key,Iterable[value]]的形式, 有點類似於sql中的groupby,例如類似於mysql中的group_concat
//按照學生姓名對學生成績進行分組
scala> val scoreDetail = sc.parallelize(List(("xiaoming",75),("xiaoming",90),("lihua",95),("lihua",100),("xiaofeng",85)))
scoreDetail: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> scoreDetail.groupByKey().collect().foreach(println(_))
(lihua,CompactBuffer(95, 100))
(xiaoming,CompactBuffer(75, 90))
(xiaofeng,CompactBuffer(85))
10.reduceByKey
接收一個函式,按照相同的key進行reduce操作,類似於scala的reduce的操作
例如RDD {(1, 2), (3, 4), (3, 6)}進行reduce ,key不變,value相加
scala> var mapRDD = sc.parallelize(List((1,2),(3,4),(3,6)))
mapRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> var reduceRDD = mapRDD.reduceByKey(_+_)
reduceRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at reduceByKey at <console>:26
scala> reduceRDD.foreach(x=>println(x))
(1,2)
(3,10)
11.aggregateByKey
對PairRDD中相同Key的值進行聚合操作,在聚合過程中同樣使用了一箇中立的初始值,因為aggregateByKey是對相同Key中的值進行聚合操作,所以aggregateByKey函式最終返回的型別還是Pair RDD,對應的結果是Key和聚合好的值
val data = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,7),(3,8)),1)
//println(data.partitions.size)
/**
* 比較相同key得兩個value中的最大值,第一次為max(1,3),1為初始值,得:3,第二次為max(3,2),得3,
* 第三次為max(3,4),得:4,所以key為1的結果為:(1,4)
*/
def seq(a:Int, b:Int) : Int ={
math.max(a,b)
}
/**
* 不同分割槽中相同key的value相加,如果只有一個分割槽,此方法不起效果
*/
def comb(a:Int, b:Int) : Int ={
a + b
}
//聚合列印結果
data.aggregateByKey(1)(seq, comb).collect.foreach(println(_))
//檢視各個分割槽資料
data.mapPartitionsWithIndex {
(partid, iter) => {
var part_map = scala.collection.mutable.Map[String, List[(Int,Int)]]()
var part_name = "part_" + partid
part_map(part_name) = List[(Int,Int)]()
while (iter.hasNext) {
part_map(part_name) :+= iter.next() //:+= 列表尾部追加元素
}
part_map.iterator
}
}.collect().foreach(println(_))
12.sortByKey
用於對pairRDD按照key進行排序,第一個引數可以設定true或者false,預設是true
scala> val rdd = sc.parallelize(Array((3, 4),(1, 2),(4,4),(2,5), (6,5), (5, 6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> rdd.sortByKey().collect
res4: Array[(Int, Int)] = Array((1,2), (2,5), (3,4), (4,4), (5,6), (6,5))
scala> rdd.sortByKey(true).collect
res5: Array[(Int, Int)] = Array((1,2), (2,5), (3,4), (4,4), (5,6), (6,5))
scala> rdd.sortByKey(false).collect
res6: Array[(Int, Int)] = Array((6,5), (5,6), (4,4), (3,4), (2,5), (1,2))
13.join
RDD1.join(RDD2) ,可以把RDD1,RDD2中的相同的key給連線起來,類似於sql中的inner join操作,返回兩邊都匹配的資料
scala> val RDD1 = sc.parallelize(Array(("A","a1"),("B","b1"),("C","c1"),("D","d1"),("E","e1"),("F","f1")))
RDD1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[19] at parallelize at <console>:24
scala> val RDD2 = sc.parallelize(Array(("A","a2"),("B","b2"),("C","c1"),("C","c2"),("C","c3"),("E","e2")))
RDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> RDD1.join(RDD2).collect
res8: Array[(String, (String, String))] = Array((B,(b1,b2)), (A,(a1,a2)), (C,(c1,c1)), (C,(c1,c2)), (C,(c1,c3)), (E,(e1,e2)))
scala> RDD2.join(RDD1).collect
res9: Array[(String, (String, String))] = Array((B,(b2,b1)), (A,(a2,a1)), (C,(c1,c1)), (C,(c2,c1)), (C,(c3,c1)), (E,(e2,e1)))
其他操作:
left outer join:是以左邊為基準,向左靠(左邊(a)的記錄一定會存在,右邊(b)的記錄有的返回Some(x),沒有的補None。)
scala> RDD1.leftOuterJoin(RDD2).collect
res11: Array[(String, (String, Option[String]))] = Array((B,(b1,Some(b2))), (A,(a1,Some(a2))), (C,(c1,Some(c1))), (C,(c1,Some(c2))), (C,(c1,Some(c3))), (E,(e1,Some(e2))), (F,(f1,None)), (D,(d1,None)))
scala> RDD2.leftOuterJoin(RDD1).collect
res12: Array[(String, (String, Option[String]))] = Array((B,(b2,Some(b1))), (A,(a2,Some(a1))), (C,(c1,Some(c1))), (C,(c2,Some(c1))), (C,(c3,Some(c1))), (E,(e2,Some(e1))))
right outer join:是以右邊為基準,向右靠(右邊(b)的記錄一定會存在,左邊(a)的記錄有的返回Some(x),沒有的補None。)
scala> RDD1.rightOuterJoin(RDD2).collect
res13: Array[(String, (Option[String], String))] = Array((B,(Some(b1),b2)), (A,(Some(a1),a2)), (C,(Some(c1),c1)), (C,(Some(c1),c2)), (C,(Some(c1),c3)), (E,(Some(e1),e2)))
scala> RDD2.rightOuterJoin(RDD1).collect
res14: Array[(String, (Option[String], String))] = Array((B,(Some(b2),b1)), (A,(Some(a2),a1)), (C,(Some(c1),c1)), (C,(Some(c2),c1)), (C,(Some(c3),c1)), (E,(Some(e2),e1)), (F,(None,f1)), (D,(None,d1)))
full outer join:左邊和右邊的都一定存在
scala> RDD1.fullOuterJoin(RDD2).collect
res16: Array[(String, (Option[String], Option[String]))] = Array((B,(Some(b1),Some(b2))), (A,(Some(a1),Some(a2))), (C,(Some(c1),Some(c1))), (C,(Some(c1),Some(c2))), (C,(Some(c1),Some(c3))), (E,(Some(e1),Some(e2))), (F,(Some(f1),None)), (D,(Some(d1),None)))
scala> RDD2.fullOuterJoin(RDD1).collect
res17: Array[(String, (Option[String], Option[String]))] = Array((B,(Some(b2),Some(b1))), (A,(Some(a2),Some(a1))), (C,(Some(c1),Some(c1))), (C,(Some(c2),Some(c1))), (C,(Some(c3),Some(c1))), (E,(Some(e2),Some(e1))), (F,(None,Some(f1))), (D,(None,Some(d1))))
14.cogroup
對兩個RDD中的KV元素,每個RDD中相同key中的元素分別聚合成一個集合。
與reduceByKey不同的是針對兩個RDD中相同的key的元素進行合併。
例子中將多個RDD中同一個Key對應的Value組合到一起。rdd1中不存在Key為dd的元素(自然就不存在Value了),在組合的過程中將rdd1對應的位置
設定為CompactBuffer()了,而不是去掉了。
scala> val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(Array(("aa",3),("bb",4),("cc",5),("dd",6),("aa",8)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val rdd3 = rdd1.cogroup(rdd2).collect
rdd3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((aa,(CompactBuffer(1),CompactBuffer(3, 8))), (dd,(CompactBuffer(),CompactBuffer(6))), (bb,(CompactBuffer(2),CompactBuffer(4))), (cc,(CompactBuffer(6),CompactBuffer(5))))
15.cartesian(笛卡爾積)
RDD1.cartesian(RDD2) 返回RDD1和RDD2的笛卡兒積,這個開銷非常大
scala> var RDD1 = sc.parallelize(List("1","2","3"))
RDD1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> var RDD2 = sc.parallelize(List("a","b","c"))
RDD2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> RDD1.cartesian(RDD2).collect
res0: Array[(String, String)] = Array((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c))
16.pipe
有種特殊的Rdd,即pipedRdd,提供了呼叫外部程式如基於CUDA的C++程式,使其能夠更快的進行計算。caffe on spark 和tensorflow on spark 也是基於此機制
#準備指令碼
#!/bin/sh
echo "Running shell script"
while read LINE; do
echo ${LINE}!
done
# Spark RDD呼叫
scala> val data = sc.parallelize(List("hi","hello","how","are","you"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24
scala> val scriptPath = "/home/hadoop/echo.sh"
scriptPath: String = /home/hadoop/echo.sh
scala> val pipeRDD = data.pipe(scriptPath)
pipeRDD: org.apache.spark.rdd.RDD[String] = PipedRDD[53] at pipe at <console>:28
scala> pipeRDD.collect()
res21: Array[String] = Array(Running shell script, hi!, hello!, how!, are!, you!)
17.coalesce 和 repartition
他們兩個都是RDD的分割槽進行重新劃分,repartition只是coalesce介面中shuffle為true的簡易實現。
repartition一定會發生shuffle過程
coalesce則不一定
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
假設RDD有N個分割槽,需要重新劃分成M個分割槽
1)N<M。一般情況下N個分割槽有資料分佈不均勻的狀況,利用HashPartitioner函式將資料重新分割槽為M個,這時需要將shuffle設定為true。
2)如果N>M並且N和M相差不多,(假如N是1000,M是100)那麼就可以將N個分割槽中的若干個分割槽合併成一個新的分割槽,最終合併為M個分割槽,這時可以將shuff設定為false,在shuffl為false的情況下,如果M>N時,coalesce為無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關係。
3)如果N>M並且兩者相差懸殊,這時如果將shuffle設定為false,父子RDD是窄依賴關係,他們同處在一個Stage中,就可能造成spark程式的並行度不夠,從而影響效能,如果在M為1的時候,為了使coalesce之前的操作有更好的並行度,可以講shuffle設定為true。
總之:如果shuff為false時,如果傳入的引數大於現有的分割槽數目,RDD的分割槽數不變,也就是說不經過shuffle,是無法將RDDde分割槽數變多的。
18.repartitionAndSortWithinPartitions
根據給定的分割槽程式對RDD進行重新分割槽,並在每個生成的分割槽內按鍵對記錄進行排序。 這比呼叫重新分割槽,然後在每個分割槽內進行排序更有效率,因為它可以將排序壓入洗牌機器。
repartitionAndSortWithinPartitions算是一個高效的運算元,是因為它要比使用repartition And sortByKey 效率高,這是由於它的排序是在shuffle過程中進行,一邊shuffle,一邊排序;
package core
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object TransformationDemo {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
val sc = new SparkContext(sparkConf)
val rdd1: RDD[(Int, Int)] = sc.parallelize(List((1,2),(2,3),(3,7),(4,8),(5,2),(6,5),(7,7)))
//1
println(rdd1.partitions.size)
/**
* (0,(7,7))
* (0,(6,5))
* (0,(5,2))
* (0,(4,8))
* (0,(3,7))
* (0,(2,3))
* (0,(1,2))
*/
rdd1.mapPartitionsWithIndex(mapPartIndexFunc).foreach(println)
//重新分割槽並排序(預設根據key升序排序)
val rdd2: RDD[(Int, Int)] = rdd1.repartitionAndSortWithinPartitions(new HashPartitioner(3))
//3
println(rdd2.partitions.size)
/**
* (0,(6,5))
* (0,(3,7))
* (1,(7,7))
* (1,(4,8))
* (1,(1,2))
* (2,(5,2))
* (2,(2,3))
*/
rdd2.mapPartitionsWithIndex(mapPartIndexFunc).foreach(println)
/**
* (3,7)
* (6,5)
* (1,2)
* (4,8)
* (7,7)
* (2,3)
* (5,2)
*/
rdd2.collect().foreach(println)
sc.stop()
}
/**
* 遍歷獲取每個分割槽中的資料
* @param i1
* @param iter
* @return
*/
def mapPartIndexFunc(i1:Int,iter: Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
var res = List[(Int,(Int,Int))]()
while(iter.hasNext){
var next = iter.next()
res=res.::(i1,next)
}
res.iterator
}
}