1. 程式人生 > >Spark RDD API解析及實戰

Spark RDD API解析及實戰

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object RDDTest {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("matrix T").setMaster("local"))//得到SparkContext
/***
      * map[U:ClassTag](f:T=>U):RDD[U]
* 通過函式f將元素型別為T的集合轉為元素型別為U的集合。TU可以相同 */ val rdd1 = sc.parallelize(1 to 10) val rdd2 = rdd1.map(_*2) rdd2.foreach(println) println("-----------------------------------------------------") val rdd3 = rdd1.map{x=> x + 3 } rdd3.foreach(println) println("-----------------------------------------------------"
) val rdd4 = rdd1.map{x=> "No." + x} rdd4.foreach(println) println("-----------------------------------------------------") /*** * flatMap[U:ClassTag](f:T=>TraversableOnce[U]):RDD[U] * 類似於map,但是在元素遍歷處理時返回的是多個元素,最終返回MapPartitionsRDD型別的RDD */ val rdd5 = sc.textFile("hdfs://pc1:9000/input/chenjie.txt"
) /** * rdd5: * hello my name is chenjie * i come from shanghai university * thank you */ val lines = rdd5.flatMap { line => println("執行一次flatMap,line=" + line + ",拆分成:" + line.split(" ")) line.split(" ") } /** * 執行一次flatMap,line=hello my name is chenjie,拆分成:[Ljava.lang.String;@5bd160c3 * 執行一次flatMap,line=i come from shanghai university,拆分成:[Ljava.lang.String;@65203f1f * 執行一次flatMap,line=thank you ,拆分成:[Ljava.lang.String;@13c84074 * * * "hello my name is chenjie" => ["hello","my","name","is","chenjie"] * "i come from shanghai university" => ["i","come","from","shanghai","university"] * "thank you" => ["thank","you"] * 壓扁 => ["hello","my","name","is","chenjie","i","come","from","shanghai","university","thank","you"] */ lines.foreach(println) println("-----------------------------------------------------") val chars = rdd5.flatMap{line => //必須返回多個元素 //Array(line.charAt(0),line.charAt(line.size-1)) import scala.collection.mutable.ArrayBuffer val arrayBufferChar = ArrayBuffer[Char]() arrayBufferChar += line.charAt(0) arrayBufferChar += line.charAt(line.size-1) arrayBufferChar } chars.foreach(println) println("-----------------------------------------------------") /*** * filter(f:T=>Boolean):RDD[T] * 傳入的函式要求返回值是Boolean型別,過濾剩下返回值為true的元素 */ val rdd6 = sc.parallelize(1 to 10) // val filterRDD = rdd val rdd7 = rdd6.filter(_%2==0) rdd7.foreach(println) println("-----------------------------------------------------") val rdd8 = rdd6.filter(x=> if(x > 3 && x < 7) true else false ) rdd8.foreach(println) println("-----------------------------------------------------") /*** * mapPartitions[U:ClassTag](f:Iterator[T]=>Iterator[U],preservesPartitioning:Boolean = false):RDD[U] */ val rdd9 = sc.parallelize(1 to 10) val rdd10 = rdd9.mapPartitions(iter=>iter.filter(_>3)) rdd10.foreach(println) println("-----------------------------------------------------") val rdd11 = rdd9.mapPartitions(iter=> iter.filter(x=> x<5 ) ) /*** * glom():RDD[Array[T]] * 將每個分割槽轉化為陣列 */ val rdd12 = sc.parallelize(1 to 10) val rdd13 = rdd12.glom()//[[1],[2],[3],[4],[5],[6],[7],[8],[9],[10]] //Array[Array[Int]] rdd13.foreach(_.foreach(println)) println("-----------------------------------------------------") /*** * distinct(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[t] * RDD中每個partitioner內部重複的元素去掉 */ val rdd14 = sc.parallelize(Array(1,2,2,2,2,2,4,5,6,7,7,7,7,7,7,8,8,9)) val rdd15 = rdd14.distinct(2) rdd15.foreach(println) println("-----------------------------------------------------") /*** * cartesian[U:ClassTag](other:RDD[U]):RDD[(T,U)] * 在兩個RDD之間,將各自的元素的笛卡爾積以內部元素型別為Tuple形式的RDD返回 */ val rdd16 = sc.parallelize(Array("A1","A2","A3")) val rdd17 = sc.parallelize(Array("B1","B2","B3")) val rdd18 = rdd16.cartesian(rdd17) rdd18.foreach(println) println("-----------------------------------------------------") /*** * union(other:RDD[T]):RDD[T] * 合併 */ val rdd19 = rdd16.union(rdd17) rdd19.foreach(println) println("-----------------------------------------------------") val rdd20 = sc.parallelize(Array(("A",1),("B",2),("C",3))) val rdd21 = rdd20.mapValues(a=>a*2) rdd21.foreach(println) println("-----------------------------------------------------") /** * subtract(other:RDD[T]):RDD[T] * 求兩個RDD之間的差集 */ val rdd22 = sc.parallelize(Array("A","B","C","D")) val rdd23 = sc.parallelize(Array("C","D","E","F")) val rdd24 = rdd22.subtract(rdd23) rdd24.foreach(println) println("-----------------------------------------------------") /*** * sample(withReplacement:Boolean,fraction:Double,seed:Long=Utils.random.nextLong):RDD[T] * 對集合中的元素進行取樣,可以指定取出元素的百分比以及隨機種子 */ val rdd25 = sc.parallelize(Array("A","B","C","D")) val sampleRDD = rdd25.sample(true,0.5,3) sampleRDD.foreach(println) println("-----------------------------------------------------") /** * takeSample(withReplacement:Boolean,num:Int,seed:Long=Utils.random.nextLong):Array[T] * 取樣 */ val rdd26 = sc.parallelize(Array("A","B","C","D")) val rdd27 = rdd26.takeSample(true,3,3) rdd27.foreach(println) println("-----------------------------------------------------") /** * groupBy[K](f:T=>K, p:Partitioner)(implicitkt:ClassTag[K],ord:Ordering[K]=null):RDD[(K,Iterable[T])] * 首先根據傳入的f產生的key,形成元素為K-V形式的RDD,然後呼叫groupByKeykey值相同的元素進行分組 */ val rdd28 = sc.parallelize(Array("V1","V2","U1","W2","U2","V2","W1")) val rdd29 = rdd28.groupBy(_.substring(0,1)) rdd29.foreach(println) println("-----------------------------------------------------") /*** * partitionBy(partitioner:Partioner):RDD[(K,V)] * 只適用於內部元素是K-V形式的RDD,主要是將RDD進行重新分割槽,如果分割槽結果與之前的一致則返回自身,否則產生ShuffledRDD型別的RDD */ val rdd30 = sc.parallelize(Array(("V1",2),("V1",1),("V2",2),("W3",1),("U1",2),("U1",1),("U1",3))) val rdd31 = rdd30.partitionBy(new HashPartitioner(3)) rdd31.foreach(println) println("-----------------------------------------------------") /** * cogroup[W](other:RDD[(K,W)],partitioner:Partitioner):RDD[(K,(Iterable[V]),Iterable[W]))] * 只適用於元素型別為K-VRDD,主要是將兩個RDD中的元素key值相同的元素行合併形成新的K-V鍵值對,其value是每個RDD 元素集合的迭代器構成的Tuple型別的元素 */ val rdd32 = sc.parallelize(Array(("V1",1),("V2",2),("V2",2),("U1",2),("U2",1),("U5",4))) val rdd33 = sc.parallelize(Array(("V1",1),("V8",2),("U1",2),("U5",1))) val rdd34 = rdd32.cogroup(rdd33) rdd34.foreach{line=> println("key=" + line._1) val iter1 = line._2._1 val iter2 = line._2._2 iter1.foreach(println) iter2.foreach(println) } println("-----------------------------------------------------") /** * combineByKey[C](createCombiner:V => C, mergeValue:(C,V)) => C,mergeCombiners:(C,C) => C ):RDD[(K,C)] * 使用於元素型別為K-V形式的RDD,它將每個分割槽中的元素按照Key合併,最後返回ShuffledRDD型別的RDD */ val rdd35 = sc.parallelize(Array(("V1",1),("V1",2),("V2",2),("U3",1),("U1",1),("U2",2))) val rdd36 = rdd35.combineByKey((v:Int)=>List(v),(c:List[Int],v:Int)=>v::c,(c1:List[Int],c2:List[Int])=>c1:::c2) rdd36.foreach(println) println("-----------------------------------------------------") /*** * reduceByKey(func:(V,V)=>V):RDD[(K,V)] * combineByKey相似,但其返回的RDD內部元素型別和原有型別保持一致 */ val rdd37 = sc.parallelize(Array(("V1",1),("V1",2),("V2",2),("U3",1),("U1",1),("U2",2))) val rdd38 = rdd37.reduceByKey(_+_) rdd38.foreach(println) println("-----------------------------------------------------") /** * join[W](other:RDD[(K,W)]):RDD[(K,(V,W))] * 該方法只適用於元素型別為K-VRDD,將兩個RDDkey相同的元素先合併成以KeyKey,以每個RDD中該元素的集合為集合 */ val rdd39 = sc.parallelize(Array(("V1",1),("U5",4))) val rdd40 = sc.parallelize(Array(("V1",4),("U5",4))) val rdd41 = rdd39.join(rdd40) rdd41.foreach(println) println("-----------------------------------------------------") } }