阿新 • • 發佈:2018-12-31
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
def makeRDD[ T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
val indexToPrefs = => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this,, math.max(seq.size, 1), indexToPrefs)
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
* Return a new RDD by applying a function to all elements of this RDD.
* 一對一的進行RDD的轉換操作,並且產生一個新的RDD儲存所有的elements
def map[U: ClassTag](f: T => U): RDD[U]
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
scala> sc.makeRDD("zhangzhangshisb").collect
res3: Array[Char] = Array(z, h, a, n, g, z, h, a, n, g, s, h, i, s, b)
* Return a new RDD containing only the elements that satisfy a predicate.
* 過濾的RDD轉換操作
def filter(f: T => Boolean): RDD[T]
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
res7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:27
scala> res7.filter(_>10)
res8: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:29
scala> res8.collect
res9: Array[Int] = Array(12, 14, 16, 18, 20)
* 通過一個演算法將RDD多維化,但是輸出卻是平面的型別
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
scala> res1.flatMap(1 to _)
res10: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at flatMap at <console>:27
scala> res10.collect
res11: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
* 將RDD進行分塊操作,使該RDD區域的所有元素執行此命令
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
scala> res1.mapPartitions(x=>x.filter(_ % 2==0).map("Partition"+_))
res12: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at mapPartitions at <console>:27
scala> res12.collect
res13: Array[String] = Array(Partition2, Partition4, Partition6, Partition8, Partition10)
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
scala> var rdd = sc.makeRDD(1 to 50,5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24
scala> rdd.collect
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50)
scala> rdd.partitions.size
res16: Int = 5
scala> rdd.mapPartitionsWithIndex((i,x)=>Iterator(i + ":["+x.mkString(",")+"]"))
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at mapPartitionsWithIndex at <console>:27
scala> res17.collect
res18: Array[String] = Array(0:[1,2,3,4,5,6,7,8,9,10], 1:[11,12,13,14,15,16,17,18,19,20], 2:[21,22,23,24,25,26,27,28,29,30], 3:[31,32,33,34,35,36,37,38,39,40], 4:[41,42,43,44,45,46,47,48,49,50])
* 簡單來說就是隨機抽樣操作
* withReplacement:
* true就是放回抽樣,
* false就是不放回抽樣
* fracition:
* 挑選出來元素的比率
* seed:
* 不用多說了吧,種子演算法
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
scala> var rdd = sc.makeRDD(1 to 50,5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24
scala> rdd.sample(true,0.5,5).collect
res19: Array[Int] = Array(8, 8, 8, 9, 14, 14, 16, 16, 20, 21, 22, 22, 25, 27, 28, 28, 32, 33, 36, 36, 45, 45, 48, 48, 49, 49)
* 聯合一個RDD,返回一個組合的RDD,但是兩個RDD的型別得一樣
def union(other: RDD[T]): RDD[T]
scala> sc.makeRDD(5 to 9).union(sc.makeRDD(10 to 15)).collect
res21: Array[Int] = Array(5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.
* @note This method performs a shuffle internally.
* 返回兩個RDD的交集
def intersection(other: RDD[T]): RDD[T]
scala> sc.makeRDD(5 to 9).intersection(sc.makeRDD(0 to 15)).collect
res22: Array[Int] = Array(6, 8, 7, 9, 5)
* Return a new RDD containing the distinct elements in this RDD.
* 去重但混洗,有點像shuffle的那種?
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
scala> sc.makeRDD(5 to 9).union(sc.makeRDD(0 to 15)).distinct.collect
res23: Array[Int] = Array(4, 0, 8, 12, 13, 1, 9, 5, 14, 6, 10, 2, 15, 11, 7, 3)
* Return a copy of the RDD partitioned using the specified partitioner.
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
scala> var rdd=sc.makeRDD(1 to 80,8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd.mapPartitionsWithIndex((i,x)=>Iterator(i + ":["+x.mkString(",")+"]"))
res0: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:27
scala> res0.collect
res1: Array[String] = Array(0:[1,2,3,4,5,6,7,8,9,10], 1:[11,12,13,14,15,16,17,18,19,20], 2:[21,22,23,24,25,26,27,28,29,30], 3:[31,32,33,34,35,36,37,38,39,40], 4:[41,42,43,44,45,46,47,48,49,50], 5:[51,52,53,54,55,56,57,58,59,60], 6:[61,62,63,64,65,66,67,68,69,70], 7:[71,72,73,74,75,76,77,78,79,80])
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala>>(x,x)).partitionBy(new HashPartitioner(5))
res2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at <console>:28
scala> res2.partitions.size
res5: Int = 5
scala> res2.mapPartitionsWithIndex((i,x)=>Iterator(i + ":["+x.mkString(",")+"]"))
res6: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at mapPartitionsWithIndex at <console>:30
scala> res6.collect
res7: Array[String] = Array(0:[(5,5),(10,10),(15,15),(20,20),(25,25),(30,30),(35,35),(40,40),(45,45),(50,50),(55,55),(60,60),(65,65),(70,70),(75,75),(80,80)], 1:[(1,1),(6,6),(11,11),(16,16),(21,21),(26,26),(31,31),(36,36),(41,41),(46,46),(51,51),(56,56),(61,61),(66,66),(71,71),(76,76)], 2:[(2,2),(7,7),(12,12),(17,17),(22,22),(27,27),(32,32),(37,37),(42,42),(47,47),(52,52),(57,57),(62,62),(67,67),(72,72),(77,77)], 3:[(3,3),(8,8),(13,13),(18,18),(23,23),(28,28),(33,33),(38,38),(43,43),(48,48),(53,53),(58,58),(63,63),(68,68),(73,73),(78,78)], 4:[(4,4),(9,9),(14,14),(19,19),(24,24),(29,29),(34,34),(39,39),(44,44),(49,49),(54,54),(59,59),(64,64),(69,69),(74,74),(79,79)])
* 根據Key進行聚合操作
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
scala> val rdd1 = sc.makeRDD(Array((1,1),(1,2),(1,5),(2,3),(2,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at makeRDD at <console>:25
scala> rdd1.reduceByKey(_+_).collect
res8: Array[(Int, Int)] = Array((2,12), (1,8), (3,6))
* 延時處理,但是實際開發,reduceBykey用的更多,將key相同的value聚集到一起
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
scala> val rdd1 = sc.makeRDD(Array((1,1),(1,2),(1,5),(2,3),(2,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at makeRDD at <console>:25
scala> rdd1.groupByKey().collect
res10: Array[(Int, Iterable[Int])] = Array((2,CompactBuffer(3, 4, 5)), (1,CompactBuffer(1, 2, 5)), (3,CompactBuffer(6)))
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)]
scala> var rdd2 = sc.makeRDD(Array(("a",90),("a",80),("a",87),("b",89),("b",74),("c",77),("c",99)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd2.combineByKey(v=>(v,1),(c:(Int,Int),v)=>(c._1+v,c._2+1),(c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))
res2: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[1] at combineByKey at <console>:27
scala> rdd2.collect
res3: Array[(String, Int)] = Array((a,90), (a,80), (a,87), (b,89), (b,74), (c,77), (c,99))
res6: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[2] at map at <console>:29
scala> res6.collect
res7: Array[(String, Int)] = Array((b,81), (a,85), (c,88))
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
scala> var rdd=sc.parallelize(List((1,3),(1,2),(2,4),(2,6),(2,8),(2,7),(3,9)),5)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[(Int, Int)] = Array((1,3), (1,2), (2,4), (2,6), (2,8), (2,7), (3,9))
scala> var agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26
scala> agg.collect
res1: Array[(Int, Int)] = Array((1,5), (2,21), (3,9))
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
scala> var rdd=sc.parallelize(List((1,3),(1,2),(2,4),(2,6),(2,8),(2,7),(3,9)),5)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> var fold = rdd.foldByKey(0)(_+_)
fold: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at foldByKey at <console>:26
scala> fold.collect
res2: Array[(Int, Int)] = Array((1,5), (2,25), (3,9))
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]
scala> var rdd=sc.parallelize(List((1,3),(1,2),(2,4),(2,6),(2,8),(2,7),(3,9)),5)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.sortByKey().collect
res3: Array[(Int, Int)] = Array((1,3), (1,2), (2,4), (2,6), (2,8), (2,7), (3,9))
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
scala> var rdd1 = sc.makeRDD(1 until(9))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:24
scala> rdd1.sortBy(x=>x).collect
res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)
scala> rdd1.sortBy(1 / _).collect
res10: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 1)
//JOIN 只留下雙方都有的KEY
//left JOIN 留下左邊RDD的資料
//right JOIN 留下右邊RDD的資料
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
scala> var rdd=sc.makeRDD(Array((1,2),(2,3)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd1=sc.makeRDD(Array((2,4),(3,5)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[29] at makeRDD at <console>:24
scala> rdd.join(rdd1).collect
res15: Array[(Int, (Int, Int))] = Array((2,(3,4)))
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))]
scala> var rdd=sc.makeRDD(Array((1,2),(2,3)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd1=sc.makeRDD(Array((2,4),(3,5)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[29] at makeRDD at <console>:24
scala> rdd.cogroup(rdd1).collect
res16: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((2,(CompactBuffer(3),CompactBuffer(4))), (1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(),CompactBuffer(5))))
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
scala> rdd.partitions.size
res18: Int = 2
scala> rdd.coalesce(1)
res19: org.apache.spark.rdd.RDD[(Int, Int)] = CoalescedRDD[35] at coalesce at <console>:27
scala> res19.partitions.size
res20: Int = 1
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
scala> rdd.repartition(1)
res21: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[39] at repartition at <console>:27
scala> res21.partitions.size
res22: Int = 1
def reduce(f: (T, T) => T): T
scala> var rdd=sc.makeRDD(Array[Int](1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at makeRDD at <console>:24
scala> rdd.reduce(_*_)
res26: Int = 362880
def collect(): Array[T]
scala> rdd.collect
res27: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
def count(): Long
scala> rdd.count
res28: Long = 9
def first(): T
scala> rdd.first
res29: Int = 1
def take(num: Int): Array[T]
scala> rdd.take(5)
res30: Array[Int] = Array(1, 2, 3, 4, 5)
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T]
scala> rdd.takeSample(true,5)
res31: Array[Int] = Array(1, 1, 5, 9, 4)
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
scala> rdd.takeOrdered(4)
res34: Array[Int] = Array(1, 2, 3, 4)
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
scala> rdd.aggregate(1)(
| (_*_),
| (_+_))
res35: Int = 15145
def fold(zeroValue: T)(op: (T, T) => T): T
scala> rdd.fold(1)(_*_)
res42: Int = 362880
def saveAsTextFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
def saveAsObjectFile(path: String): Unit
def countByKey(): Map[K, Long]
scala> var rdd=sc.makeRDD((List((1,2),(1,100),(1,14),(2,18),(2,14),(3,77))))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[44] at makeRDD at <console>:24
scala> rdd.countByKey
res47: scala.collection.Map[Int,Long] = Map(2 -> 2, 1 -> 3, 3 -> 1)
def foreach[U](f: A => U): Unit
scala> rdd.foreach(print(_))