spark Transformation 運算元
map(func)
通過函式func傳遞源的每個元素來形成一個新的分散式資料集
val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
arr.map(x=>(x._1+x._2)).foreach(println)
print:
A1
B2
C3
mapValues
mapValues顧名思義就是輸入函式應用於RDD中Kev-Value的Value,原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函式只適用於元素為KV對的RDD。
scala> val a = sc.parallelize (List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
scala> val b = a.map(x => (x.length, x))
scala> b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))
mapWith
mapWith是map的另外一個變種,map只需要一個輸入函式,而mapWith有兩個輸入函式。它的定義如下:
def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]
第一個函式constructA是把RDD的partition index(index從0開始)作為輸入,輸出為新型別A;
第二個函式f是把二元組(T, A)作為輸入(其中T為原RDD中的元素,A為第一個函式的輸出),輸出型別為U。
舉例:把partition index 乘以10,然後加上2作為新的RDD的元素。
val x = sc.parallelize(List(1 ,2,3,4,5,6,7,8,9,10), 3)
x.mapWith(a => a * 10)((a, b) => (b + 2)).collect
res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)
flatMap(func):
與map類似,但每個元素輸入項都可以被對映到0個或多個的輸出項,最終將結果”扁平化“後輸出
val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
arr.flatmap(x=>(x._1+x._2)).foreach(println)
print:
A
1
B
2
C
3
flatMapWith
flatMapWith與mapWith很類似,都是接收兩個函式,一個函式把partitionIndex作為輸入,輸出是一個新型別A;另外一個函式是以二元組(T,A)作為輸入,輸出為一個序列,這些序列裡面的元素組成了新的RDD。它的定義如下:
def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]
scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)
flatMapValues
flatMapValues類似於mapValues,不同的在於flatMapValues應用於元素為KV對的RDD中Value。每個一元素的Value被輸入函式對映為一系列的值,然後這些值再與原RDD中的Key組成一系列新的KV對。
scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val b = a.flatMapValues(x=>x.to(5))
scala> b.collect
res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))
上述例子中原RDD中每個元素的值被轉換為一個序列(從其當前值到5),比如第一個KV對(1,2), 其值2被轉換為2,3,4,5。然後其再與原KV對中Key組成一系列新的KV對(1,2),(1,3),(1,4),(1,5)。
mapPartitions(func):
將map函式作用與每個分割槽,函式型別為:Iterator[T] => Iterator[U]
一般在資料庫操作中用到
var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分割槽
scala> var rdd3 = rdd1.mapPartitions{ x => {
| var result = List[Int]()
| var i = 0
| while(x.hasNext){
| i += x.next()
| }
| result.::(i).iterator //要求輸入和返回都Iterator[U]型別,故將i轉換為iterator
| }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23
//rdd3將rdd1中每個分割槽中的數值累加
scala> rdd3.collect
res65: Array[Int] = Array(3, 12)
scala> rdd3.partitions.size
res66: Int = 2
mapPartitionsWithIndex(func):
類似於mapPartitions,而且還提供FUNC與表示所述分割槽的索引的整數值,所以FUNC必須是型別(Int, Iterator[T]) => Iterator[U]
var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分割槽
var rdd2 = rdd1.mapPartitionsWithIndex{
(x,iter) => {
var result = List[String]()
var i = 0
while(iter.hasNext){
i += iter.next()
}
result.::(x + "|" + i).iterator
}
}
//rdd2將rdd1中每個分割槽的數字累加,並在每個分割槽的累加結果前面加了分割槽索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)
filter(func):
增加過濾器,針對一維的資料
val rdd = sc.parallelize(1 to 5)
rdd.filter(_>3).collect().foreach(println)
4
5
sample(withReplacement,fraction,seed):
指定的隨機種子隨機抽樣
withReplacement:抽出的資料是否放回,true為有放回的抽樣,false為無放回的抽樣
fraction :抽樣的數量
seed:隨機種子
val rdd = sc.parallelize(1 to 10)
val sample1 = rdd.sample(true,0.5,3)
sample1.collect.foreach(println)
9
union(ortherDataset):
將兩個Rdd資料集合並返回
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(3 to 5)
val unionRDD = rdd1.union(rdd2)
unionRDD.collect.foreach(println)
輸出:
1
2
3
3
4
5
intersection(otherDataset):
返回兩個Rdd的交集
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(3 to 5)
val unionRDD = rdd1.intersection(rdd2)
unionRDD.collect.foreach(println)
輸出:
3
distinct([ numTasks ])
對RDD資料集進行去重
val list = List(1,1,2,5,2,9,6,1)
val distinctRDD = sc.parallelize(list)
val unionRDD = distinctRDD.distinct()
unionRDD.collect.foreach(println)
1
9
5
6
2
groupByKey([ numTasks ])
當呼叫(K,V)對的資料集時,返回(K,Iterable )對的資料集。
一般優先選擇reduceByKey,combineByKey,foldByKey
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum)).collect().foreach(println)
輸出:
(two,2)
(one,1)
(three,3)
reduceByKey(func,[ numTasks ])
當呼叫(K,V)對的資料集時,返回(K,V)對的資料集,其中每個鍵的值使用給定的reduce函式func進行聚合,函式func必須是(V,V)=> V.像in一樣groupByKey,reduce任務的數量可以通過可選的第二個引數來配置。
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD
.reduceByKey(_ + _)
.collect().foreach(println)
輸出:
(two,2)
(one,1)
(three,3)
aggregate(zeroValue,seq,comb,taskNums)
將初始值和第一個分割槽中的第一個元素傳遞給seq函式進行計算,然後將計算結果和第二個元素傳遞給seq函式,直到計算到最後一個值。第二個分割槽中也是同理操作。最後將初始值、所有分割槽的結果經過combine函式進行計算(先將前兩個結果進行計算,將返回結果和下一個結果傳給combine函式,以此類推),並返回最終結果。
>>> data = sc.parallelize((1,2,3,4,5,6),2)
>>> def seq(a,b):
... print 'seqOp:'+str(a)+"\t"+str(b)
... return min(a,b)
...
>>> def combine(a,b):
... print 'comOp:'+str(a)+"\t"+str(b)
... return a+b
...
>>> data.aggregate(3,seq,combine)
seqOp:3 1
seqOp:1 2
seqOp:1 3
seqOp:3 4
seqOp:3 5
seqOp:3 6
comOp:3 1
comOp:4 3
7
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.4\\hadoop-2.6.4")
val conf = new SparkConf().setAppName("Join").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3)),2)
def seq(a:Int, b:Int): Int = {
println("seq: " + a + "\t " + b)
math.max(a, b)
}
def comb(a:Int,b:Int):Int={
println("comb : "+a+"\t"+b)
a+b
}
val result = data.aggregateByKey(0)(seq,comb) //.collect()
result.collect().foreach(println)
print("==="+data.partitions.size)
}
從上面的程式碼的輸出結果可以看出,1,2,3被分到一個分割槽中,4,5,6被分到一個分割槽中。3先和第一個元素1傳給seq函式,返回最小值1,然後將1和第二個元素2傳給seq函式,返回1,以此類推,最後返回第一個分割槽中的最小值1。第二個分割槽一樣道理,最後結果返回最小值3.最後將初始值3和兩個分割槽的結果經過combine函式進行計算,先將初始值3和第一個分割槽的結果1傳給combine函式,返回4,然後將4和第二個分割槽結果3傳給combine函式,返回最終結果7。
簡單說就是先分割槽中使用seq(都帶入初始值),在不同分割槽 使用comb函式(也帶入初始值)
aggregateByKey(zeroValue)(seqOp,combOp,[ numTasks ])
當呼叫(K,V)對的資料集時,返回(K,U)對的資料集,其中使用給定的組合函式和中性的“零”值來彙總每個鍵的值。允許與輸入值型別不同的聚合值型別,同時避免不必要的分配。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AggregateByKeyOp {
def main(args:Array[String]){
val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey").setMaster("local")
val sc: SparkContext = new SparkContext(sparkConf)
val data=List((1,3),(1,2),(1,4),(2,3))
val rdd=sc.parallelize(data, 2)
//合併不同partition中的值,a,b得資料型別為zeroValue的資料型別
def combOp(a:String,b:String):String={
println("combOp: "+a+"\t"+b)
a+b
}
//合併在同一個partition中的值,a的資料型別為zeroValue的資料型別,b的資料型別為原value的資料型別
def seqOp(a:String,b:Int):String={
println("SeqOp:"+a+"\t"+b)
a+b
}
rdd.foreach(println)
//zeroValue:中立值,定義返回value的型別,並參與運算
//seqOp:用來在同一個partition中合併值
//combOp:用來在不同partiton中合併值
val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp)
sc.stop()
}
}
將資料拆分成兩個分割槽
//分割槽一資料
(1,3)
(1,2)
//分割槽二資料
(1,4)
(2,3)
//分割槽一相同key的資料進行合併
seq: 100 3 //(1,3)開始和中立值進行合併 合併結果為 1003
seq: 1003 2 //(1,2)再次合併 結果為 10032
//分割槽二相同key的資料進行合併
seq: 100 4 //(1,4) 開始和中立值進行合併 1004
seq: 100 3 //(2,3) 開始和中立值進行合併 1003
將兩個分割槽的結果進行合併
//key為2的,只在一個分割槽存在,不需要合併 (2,1003)
(2,1003)
//key為1的, 在兩個分割槽存在,並且資料型別一致,合併
comb: 10032 1004
(1,100321004)
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.4\\hadoop-2.6.4")
val conf = new SparkConf().setAppName("Join").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1,2,3,4,5,6),2)
def seq(a:Int, b:Int): Int = {
println("seq: " + a + "\t " + b)
math.min(a, b)
}
def comb(a:Int,b:Int):Int={
println("comb : "+a+"\t"+b)
a+b
}
println("====="+data.aggregate(3)(seq,comb))
}
簡單說就是先分割槽中不同的key使用seq(不同key都帶入初始值),在不同分割槽的key 使用comb函式(不帶初始值)
sortByKey([ascending], [numTasks])
當呼叫K實現Ordered的(K,V)對的資料集時,按照布林ascending引數中的指定,按照升序或降序返回按鍵排序的(K,V)對的資料集。
val d2 = sc.parallelize(Array(("cc",32),("bb",32),("cc",22),("aa",18),("bb",6),("dd",16),("ee",104),("cc",1),("ff",13),("gg",68),("bb",44)))
d2.sortByKey(false).collect
輸出:
res60: Array[(String, Int)] = Array((gg,68), (ff,13), (ee,104), (dd,16), (cc,32), (cc,22), (cc,1), (bb,32), (bb,6), (bb,44), (aa,18))
join(otherDataset, [numTasks])
當(K,V)和(K,W)型別的資料集被呼叫時,返回(K,(V,W))對的每個鍵的所有元素對的資料集。外連線通過支援leftOuterJoin,rightOuterJoin和fullOuterJoin。
val a = sc.parallelize(Array(("123",4.0),("456",9.0),("789",9.0))
val b = sc.parallelize(Array(("123",8.0),("789",10)))
val c = a.join(b)
c.foreach(println)
/*
(123,(4.0,8.0))
(789,(9.0,10))
*/
cogroup(otherDataset, [numTasks])
當(K,V)和(K,W)型別的資料集被呼叫時,返回(K,(Iterable ,Iterable ))元組的資料集。這個操作也被稱為groupWith。
val a = sc.parallelize(Array(("123",4.0),("456",9.0),("789",9.0))
val b = sc.parallelize(Array(("123",8.0),("789",10)))
val d = a.cogroup(b)
d.foreach(println)
/*
(456,(CompactBuffer(9.0),CompactBuffer()))
(123,(CompactBuffer(4.0),CompactBuffer(8.0)))
(789,(CompactBuffer(9.0),CompactBuffer(10)))
*/
pipe(command, [envVars])
通過shell命令管理RDD的每個分割槽,例如Perl或bash指令碼。RDD元素被寫入程序的stdin,輸出到stdout的行被作為字串的RDD返回。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object PipeTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("pipe Test")
val sc = new SparkContext(sparkConf)
val data1 = Array[(String, Int)](
("K1", 1), ("K2", 2),
("U1", 3), ("U2", 4),
("W1", 3), ("W2", 4), )
val pairs = sc.parallelize(data1, 3)
val finalRDD = pairs.pipe("grep 2")
finalRDD.foreach(x => println("!!!!!!!! " + x + " !!!!!!!!"))//輸出
//(K2,2)
//(U2,4)
//(W2,4)
sc.stop()
}
}
cartesian(otherDataset):
對兩Rdd進笛卡爾積操作
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(2 to 5)
val cartesianRDD = rdd1.cartesian(rdd2)
cartesianRDD.collect.foreach(println)
(1,2)
(1,3)
(1,4)
(1,5)
(2,2)
(2,3)
(2,4)
(2,5)
(3,2)
(3,3)
(3,4)
(3,5)
coalesce(numPartitions,shuffle):
對RDD的分割槽進行重新分割槽,shuffle預設值為false,當shuffle=false時,不能增加分割槽數
目,但不會報錯,只是分割槽個數還是原來的
val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(3) //當suffle的值為false時,不能增加分割槽數(即分割槽數不能從5->7)
println("重新分割槽後的分割槽個數:"+coalesceRDD.partitions.size)
輸出
重新分割槽後的分割槽個數:3
repartition(numPartition):
是函式coalesce(numPartition,true)的實現
glom():
將RDD的每個分割槽中的型別為T的元素轉換換陣列Array[T]
val rdd = sc.parallelize(1 to 16,4)
val glomRDD = rdd.glom() //RDD[Array[T]]
glomRDD.foreach(rdd => println(rdd.getClass.getSimpleName))
randomSplit(weight:Array[Double],seed):
val rdd = sc.parallelize(1 to 10)
val randomSplitRDD = rdd.randomSplit(Array(1.0,2.0,7.0))
scala> randomSplitRDD(0).foreach(x => print(x +" "))
為空
scala> randomSplitRDD(1).foreach(x => print(x +" "))
8
scala> randomSplitRDD(2).foreach(x => print(x +" "))
9 3 1 6 2 4 10 5 7
repartitionAndSortWithinPartitions(partitioner)
根據給定的分割槽器對RDD進行重新分割槽,並在每個結果分割槽中按鍵分類記錄。這比repartition在每個分割槽中呼叫然後排序更高效,因為它可以將排序推送到洗牌機器中。
repartitionAndSortWithinPartitions,官方建議,如果需要在repartition重分割槽之後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions運算元。因為該運算元可以一邊進行重分割槽的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,效能可能是要高的。
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by sunxufeng on 2016/6/18.
*/
class Student {
}
//建立key類,key組合鍵為grade,score
case class StudentKey(grade:String,score:Int)
// extends Ordered[StudentKey]{
// def compare(that: StudentKey) : Int = {
// var result:Int = this.grade.compareTo(that.grade)
// if (result == 0){
// result = this.student.compareTo(that.student)
// if(result ==0){
// result = that.score.compareTo(this.score)
// }
// }
// result
// }
//}
object StudentKey {
implicit def orderingByGradeStudentScore[A <: StudentKey] : Ordering[A] = {
// Ordering.by(fk => (fk.grade, fk.student, fk.score * -1))
Ordering.by(fk => (fk.grade, fk.score * -1))
}
}
object Student{
def main(args: Array[String]) {
//定義hdfs檔案索引值
val grade_idx:Int=0
val student_idx:Int=1
val course_idx:Int=2
val score_idx:Int=3
//定義轉化函式,不能轉化為Int型別的,給預設值0
def safeInt(s: String): Int = try { s.toInt } catch { case _: Throwable => 0 }
//定義提取key的函式
def createKey(data: Array[String]):StudentKey={
StudentKey(data(grade_idx),safeInt(data(score_idx)))
}
//定義提取value的函式
def listData(data: Array[String]):List[String]={
List(data(grade_idx),data(student_idx),data(course_idx),data(score_idx))
}
def createKeyValueTuple(data: Array[String]) :(StudentKey,List[String]) = {
(createKey(data),listData(data))
}
//建立分割槽類
import org.apache.spark.Partitioner
class StudentPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[StudentKey]
k.grade.hashCode() % numPartitions
}
}
//設定master為local,用來進行本地除錯
val conf = new SparkConf().setAppName("Student_partition_sort").setMaster("local")
val sc = new SparkContext(conf)
//學生資訊是打亂的
val student_array =Array(
"c001,n003,chinese,59",
"c002,n004,english,79",
"c002,n004,chinese,13",
"c001,n001,english,88",
"c001,n002,chinese,10",
"c002,n006,chinese,29",
"c001,n001,chinese,54",
"c001,n002,english,32",
"c001,n003,english,43",
"c002,n005,english,80",
"c002,n005,chinese,48",
"c002,n006,english,69"
)
//將學生資訊並行化為rdd
val student_rdd = sc.parallelize(student_array)
//生成key-value格式的rdd
val student_rdd2 = student_rdd.map(line => line.split(",")).map(createKeyValueTuple)
//根據StudentKey中的grade進行分割槽,並根據score降序排列
val student_rdd3 = student_rdd2.repartitionAndSortWithinPartitions(new StudentPartitioner(10))
//列印資料
student_rdd3.collect.foreach(println)
}
}