spark運算元詳解
阿新 • • 發佈:2018-11-02
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) 定義: def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = self.withScope {} 從定義中我們可以看出,該函式最終返回的型別是C,也就是reateCombiner所構造和返回的型別。下面是官方解釋: * Generic function to combine the elements for each key using a custom set of aggregation * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C * * Users provide three functions: * * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C's into a single one. * * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key).1234567891011 通俗一點講: combineByKey的作用是:Combine values with the same key using a different result type. createCombiner函式是通過value構造並返回一個新的型別為C的值,這個型別也是combineByKey函式返回值中value的型別(key的型別不變)。 mergeValue函式是把具有相同的key的value合併到C中。這時候C相當於一個累計器。(同一個partition內) mergeCombiners函式把兩個C合併成一個C。(partitions之間) 舉一個例子(parseData是(String,String)型別的) scala> val textRDD = sc.parallelize(List(("A", "aa"), ("B","bb"),("C","cc"),("C","cc"), ("D","dd"), ("D","dd"))) textRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val combinedRDD = textRDD.combineByKey( | value => (1, value), | (c:(Int, String), value) => (c._1+1, c._2), | (c1:(Int, String), c2:(Int, String)) => (c1._1+c2._1, c1._2) | ) combinedRDD: org.apache.spark.rdd.RDD[(String, (Int, String))] = ShuffledRDD[1] at combineByKey at <console>:26 scala> scala> combinedRDD.collect.foreach(x=>{ | println(x._1+","+x._2._1+","+x._2._2) | }) D,2,dd A,1,aa B,1,bb C,2,cc scala>12345678910111213141516171819202122 第二個例子: scala> val textRDD = sc.parallelize(List(("A", "aa"), ("B","bb"),("C","cc"),("C","cc"), ("D","dd"), ("D","dd"))) textRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val combinedRDD2 = textRDD.combineByKey( | value => 1, | (c:Int, String) => (c+1), | (c1:Int, c2:Int) => (c1+c2) | ) combinedRDD2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at combineByKey at <console>:26 scala> combinedRDD2.collect.foreach(x=>{ | println(x._1+","+x._2) | }) D,2 A,1 B,1 C,2 scala>12345678910111213141516171819 上面兩個函式的作用是相同的,返回型別不一樣,目的是統計key的個數。第一個的型別是(String,(Int,String)),第二個的型別是(String,Int)。 aggregate aggregate使用者聚合RDD中的元素,先使用seqOp將RDD中每個分割槽中的T型別元素聚合成U型別,再使用combOp將之前每個分割槽聚合後的U型別聚合成U型別,特別注意seqOp和combOp都會使用zeroValue的值,zeroValue的型別為U。這個方法的引數和combineByKey函式差不多。我們需要注意的是,aggregate函式是先計算每個partition中的資料,在計算partition之間的資料。 /** * Aggregate the elements of each partition, and then the results for all the partitions, using * given combine functions and a neutral "zero value". This function can return a different result * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. * * @param zeroValue the initial value for the accumulated result of each partition for the * `seqOp` operator, and also the initial value for the combine results from * different partitions for the `combOp` operator - this will typically be the * neutral element (e.g. `Nil` for list concatenation or `0` for summation) * @param seqOp an operator used to accumulate results within a partition * @param combOp an associative operator used to combine results from different partitions */ def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult }12345678910111213141516171819202122232425 例子:在spark shell中,輸入下面程式碼。注意,本例子的初始值是一個元組,該型別也是aggregate函式的輸出型別。這個函式的作用是統計字母的個數,同時拼接所有的字母。 scala> val textRDD = sc.parallelize(List("A", "B", "C", "D", "D", "E")) textRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> val resultRDD = textRDD.aggregate((0, ""))((acc, value)=>{(acc._1+1, acc._2+":"+value)}, (acc1, acc2)=> {(acc1._1+acc2._1, acc1._2+":"+acc2._2)}) resultRDD: (Int, String) = (6,::D:E::D::A::B:C)12345 第二個例子:初始值為20000,Int型別,所以該函式的輸出型別也為Int,該函式的作用是在20000基礎上疊加所有字母的ascall碼的值 scala> val textRDD = sc.parallelize(List('A', 'B', 'C', 'D', 'D', 'E')) textRDD: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[4] at parallelize at <console>:24 scala> val resultRDD2 = textRDD.aggregate[Int](20000)((acc, cha) => {acc+cha}, (acc1, acc2)=>{acc1+acc2}) resultRDD2: Int = 100403 123456 collect() 返回RDD中所有的元素。需要注意的是,這個方法會返回所有的分割槽的資料,所以如果資料量比較大的話(大於一個節點能夠承載的量),使用該方法可能會出現問題。 countByValue() 該方法的定義為: def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope { map(value => (value, null)).countByKey() }123 呼叫它的RDD不是一個pair型的,它返回值為一個Map,這個map的的key表示某個元素,這個map的value是Long型別的,表示某一個元素重複出現的次數。 看一個例子: scala> val textRDD = sc.parallelize(List('A', 'B', 'C', 'D', 'D', 'E')) textRDD: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[4] at parallelize at <console>:24 scala> textRDD.countByValue() res7: scala.collection.Map[Char,Long] = Map(E -> 1, A -> 1, B -> 1, C -> 1, D -> 2) 123456 mapValues(func) 描述:Apply a function to each value of a pair RDD without changing the key. 例子:rdd.mapValues(x => x+1) 結果:{(1, 3), (3, 5), (3, 7)} flatMapValues(func) 定義: /** * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {}12345 從定義可以看出,flatMapValues函式的輸入資料的型別和返回的資料型別是一樣的。該函式的引數是一個方法(假設此方法叫method)。method方法的有一個引數,返回值的型別是TraversableOnce[U],TraversableOnce[U]是幹什麼的呢?下面這段話是官方的解釋。通俗來講,TraversableOnece是一個用於集合(collection)的介面,具有遍歷迭代的能力。 A template trait for collections which can be traversed either once only or one or more times.1 flatMapValues的作用是把一個key-value型RDD的value傳給一個TraversableOnece型別的方法,key保持不變,value便是TraversableOnece方法所迭代產生的值,這些值對應一個相同的key。 例子: rdd 是{(1, 2), (3, 4), (3, 6)} rdd.flatMapValues(x => (x to 5) 上面的x表示的是rdd的value,為2,4,6,結果: {(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)} 再看一個例子: val a = sc.parallelize(List((1,2),(3,4),(5,6))) val b = a.flatMapValues(x=>1 to x) b.collect.foreach(println(_)) /* (1,1) (1,2) (3,1) (3,2) (3,3) (3,4) (5,1) (5,2) (5,3) (5,4) (5,5) (5,6) */1234567891011121314151617 fold(zero)(func) 該方法和reduce方法一樣,但是,fold有一個“zero”值作為引數,資料存在多少個分割槽中就有多少個“zero”值。該函式現計算每一個分割槽中的資料,再計算分割槽之間中的資料。所以,有多少個分割槽就會有多少個“zero”值被包含進來。 scala> val textRDD = sc.parallelize(List("A", "B", "C", "D", "D", "E")) textRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24 scala> textRDD.reduce((a, b)=> (a+b)) res11: String = DBCADE scala> textRDD.fold("")((a, b)=>(a+b)) res12: String = BCDEDA 123456789 scala> var rdd = sc.parallelize(1 to 10, 2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24 scala> rdd.fold(0)((a,b)=>(a+b)) res36: Int = 55 scala> rdd.partitions.length res38: Int = 2 scala> rdd.fold(1)((a,b)=>(a+b)) res37: Int = 581234567891011 上面第二個例子中總共有兩個partition,為什麼結果是58(55+3)而不是57呢?因為分割槽1和分割槽2分別有一個zero值,分割槽1和分割槽2相加的時候又包含了一次“zero”值。 mapValues(func) 該函式作用於key-value型RDD的value值,key不變。也就是說,改變該RDD的value值,key不變,返回值還是一個key-value的形式,只是這裡的value和之前的value可能不一樣。 下面的例子是把RDD的value值都加1. scala> val textRDD = sc.parallelize(List((1, 3), (3, 5), (3, 7))) textRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> val mappedRDD = textRDD.mapValues(value => {value+1}) mappedRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[11] at mapValues at <console>:26 scala> mappedRDD.collect.foreach(println) (1,4) (3,6) (3,8) scala> 123456789101112 keys() 描述:Return an RDD of just the keys. 例子: rdd.keys() 結果: {1, 3, 3} values() Return an RDD of just the values. rdd.values() {2, 4, 6} groupByKey() 描述: Group values with the same key. 例子: rdd.groupByKey() 輸入資料: {(1, 2), (3, 4), (3, 6)} 結果: {(1,[2]),(3, [4,6])} scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6))) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> val groupRDD = rdd.groupByKey groupRDD: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[4] at groupByKey at <console>:26 scala> groupRDD.collect.foreach(print) (1,CompactBuffer(2))(3,CompactBuffer(4, 6))12345678 上面的groupRDD的型別是(Int,Iterable[Int]) reduceByKey(func) 作用:作用於key-value型的RDD,組合具有相同key的value值。 看一個例子:把具有相同的key的value拼接在一起,用分號隔開。 scala> val textRDD = sc.parallelize(List(("A", "aa"), ("B","bb"),("C","cc"),("C","cc"), ("D","dd"), ("D","dd"))) textRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[7] at parallelize at <console>:24 scala> val reducedRDD = textRDD.reduceByKey((value1,value2) => {value1+";"+value2}) reducedRDD: org.apache.spark.rdd.RDD[(String, String)] = ShuffledRDD[9] at reduceByKey at <console>:26 scala> reducedRDD.collect.foreach(println) (D,dd;dd) (A,aa) (B,bb) (C,cc;cc) scala>12345678910111213 scala> sc.parallelize(List((1,2),(3,4),(3,6))) res0: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25 scala> res0.reduceByKey(_+_) res1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at reduceByKey at <console>:27 scala> res1.collect.foreach(println) (1,2) (3,10) scala>1234567891011 sortByKey() Return an RDD sorted by the key. rdd.sortByKey() {(1, 2), (3, 4), (3, 6)} reduce(func) 該函式的定義為: /** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */ def reduce(f: (T, T) => T): T = withScope {}12345 它的引數是一個函式(methodA),並且methodA的引數是兩個型別相同的值,methodA的返回值為“一個”同類型的值,所以,從這裡我們就可以看出reduce函式的作用是“reduce”。需要注意的是,reduce函式的返回值型別和methodA方法的引數的型別是一樣的。 執行一個例子瞧一瞧: scala> val textRDD = sc.parallelize(List("A", "B", "C", "D", "D", "E")) textRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24 scala> textRDD.reduce((a, b)=> (a+b)) res11: String = DBCADE 123456 subtractByKey 定義: def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = self.withScope {}1 作用:Return an RDD with the pairs from this whose keys are not in other. scala> val textRDD = sc.parallelize(List((1, 3), (3, 5), (3, 7))) textRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[12] at parallelize at :24 scala> val textRDD2 = sc.parallelize(List((3,9))) textRDD2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at :24 scala> val subtractRDD = textRDD.subtractByKey(textRDD2) subtractRDD: org.apache.spark.rdd.RDD[(Int, Int)] = SubtractedRDD[18] at subtractByKey at :28 scala> subtractRDD.collect.foreach(println) (1,3) scala> join – inner join 定義: /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {}123456 從上面的定義中可以看出,join函式的引數是一個RDD,返回值也是一個RDD。返回值RDD的型別是一個元組,該元組的key型別是兩個RDD的key型別,value的型別又是一個元組。假設RDD1.join(RDD2),那麼V型別表示RDD1的value的型別,W表示RDD2的value的型別。分析到這裡我們大致就可以知道這個函式的作用了。 看一個例子: scala> val textRDD = sc.parallelize(List((1, 3), (3, 5), (3, 7), (3, 8), (3, 9))) textRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:24 scala> val textRDD2 = sc.parallelize(List((3,9), (3,4))) textRDD2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:24 scala> val joinRDD = textRDD.join(textRDD2) joinRDD: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[33] at join at <console>:28 scala> joinRDD.collect.foreach(println) (3,(5,9)) (3,(5,4)) (3,(7,9)) (3,(7,4)) (3,(8,9)) (3,(8,4)) (3,(9,9)) (3,(9,4))123456789101112131415161718 leftOuterJoin 和join方法差不多,有一點區別,先看一個例子: scala> val textRDD = sc.parallelize(List((1, 3), (3, 5), (3, 7), (3, 8), (3, 9))) textRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:24 scala> val textRDD2 = sc.parallelize(List((3,9), (3,4))) textRDD2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:24 scala> val joinRDD = textRDD.leftOuterJoin(textRDD2) joinRDD: org.apache.spark.rdd.RDD[(Int, (Int, Option[Int]))] = MapPartitionsRDD[36] at leftOuterJoin at <console>:28 scala> joinRDD.collect.foreach(println) (1,(3,None)) (3,(5,Some(9))) (3,(5,Some(4))) (3,(7,Some(9))) (3,(7,Some(4))) (3,(8,Some(9))) (3,(8,Some(4))) (3,(9,Some(9))) (3,(9,Some(4))) 1234567891011121314151617181920 從上面這個例子看出,textRDD(左邊)的key一定存在,textRDD2的key如果不存在於textRDD中,會以None代替。 rightOuterJoin 這個方法和leftOuterJoin相反。 scala> val textRDD = sc.parallelize(List((1, 3), (3, 5), (3, 7), (3, 8), (3, 9))) textRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:24 scala> val textRDD2 = sc.parallelize(List((3,9), (3,4))) textRDD2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:24 scala> val joinRDD = textRDD.rightOuterJoin(textRDD2) joinRDD: org.apache.spark.rdd.RDD[(Int, (Option[Int], Int))] = MapPartitionsRDD[39] at rightOuterJoin at <console>:28 scala> joinRDD.collect.foreach(println) (3,(Some(5),9)) (3,(Some(5),4)) (3,(Some(7),9)) (3,(Some(7),4)) (3,(Some(8),9)) (3,(Some(8),4)) (3,(Some(9),9)) (3,(Some(9),4)) scala> 123456789101112131415161718192021 cogroup 現看一個例子: scala> val textRDD = sc.parallelize(List((1, 3), (3, 5), (3, 7), (3, 8), (3, 9))) textRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:24 scala> val textRDD2 = sc.parallelize(List((3,9), (3,4))) textRDD2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:24 scala> val cogroupRDD = textRDD.cogroup(textRDD2) cogroupRDD: org.apache.spark.rdd.RDD[(Int, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[41] at cogroup at <console>:28 scala> cogroupRDD.collect.foreach(println) (1,(CompactBuffer(3),CompactBuffer())) (3,(CompactBuffer(5, 7, 8, 9),CompactBuffer(9, 4))) scala> 1234567891011121314 下面是該函式的定義: /** * For each key k in `this` or `other1` or `other2` or `other3`, * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {}12345678910 看了上面的例子和定義,應該很好理解cogroup的作用了。 countByKey() – action 對於key-value形式的RDD,統計相同的key出現的次數。 scala> val textRDD = sc.parallelize(List((1, 3), (3, 5), (3, 7), (3, 8), (3, 9))) textRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:24 scala> val countRDD = textRDD.countByKey() countRDD: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 4) 123456 collectAsMap() –action 對於key-value形式的RDD, 先collect,然後把它們轉換成map,便於查詢。 scala> val textRDD = sc.parallelize(List((1, 3), (3, 5), (3, 7), (3, 8), (3, 9))) textRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:24 scala> val countRDD = textRDD.collectAsMap() countRDD: scala.collection.Map[Int,Int] = Map(1 -> 3, 3 -> 9)12345 需要注意的是:如果有多個相同的key,那麼後一個value會覆蓋前一個value。 mllib-statistics google-math programming-guide