1. 程式人生 > >Scala版運算元(包括:join,leftjoin,rightjoin,fulljoin,distinct,saveAsTextFile,foreachPartition,mapPartit)【程式碼】

Scala版運算元(包括:join,leftjoin,rightjoin,fulljoin,distinct,saveAsTextFile,foreachPartition,mapPartit)【程式碼】

package com.bjsxt.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.actors.threadpool.Arrays

import scala.collection.mutable.ListBuffer

object SparkJoin {
  def main(args: Array[String]): Unit = {
   val  conf=new SparkConf().setAppName("test").setMaster("local");
   val  sc=new SparkContext(conf);
   val rdd6=sc.parallelize(Array("a","b","b","b","c","d","d","e","f","g","h"),2);
    /**
     * saveAsTextFile:將計算結果儲存成檔案
     */
   rdd6.saveAsTextFile("./result")
   /**
     * foreachPartition:只是簡單的分割槽遍歷,沒有其他的操作
     */
   val rdd8=rdd6.foreachPartition(iter=>{
    while(iter.hasNext){
         println(iter.next())
    }
     
   })
   println("_____________________________________________________-")
   /**
    * mapPartitions插入分割槽操作
    */
   val rdd7= rdd6.mapPartitions(iter=>{   //遍歷一個分割槽,當有插入一個數據庫操作的時候,需要用mapPartitions操作
     val list=ListBuffer[String]();
     while(iter.hasNext){
       val one=iter.next();
       list.+=(one+"~")
     }
     list.iterator
     
   }, true)
   rdd7.foreach(println)
   println("遍歷結束了+++++++++++++++++++++++++++++++++++++++++++++++++=")
   
   
   val rdd5=sc.parallelize(Array("a","b","b","b","c","d","d"),2);
   
   val result5=rdd5.distinct();
   result5.foreach(println)
   
   val rdd2=sc.parallelize(Array((1,2),(3,4),(5,6)));
   val rdd3= sc.parallelize(Array(("zhangsan","18"),("lisi","23"),("wangwu","20"),("maliu","50")))
   val rdd4= sc.parallelize(Array(("zhangsan","180"),("lisi","230"),("wangwu","200"),("tianqi","500")))
   
   
   
   
   
   /**
    * join操作
    */
   val result=rdd3.join(rdd4);
   result.foreach(println);
   /**
    * leftOuterJoin
    */
   val result2=rdd3.leftOuterJoin(rdd4);   
   result2.foreach(println);
   println("leftouterjoin的細化操作:")
   result2.foreach(tuple=>{
     val key=tuple._1
     val value1=tuple._2._1
     val options=tuple._2._2
    
     println("key=>"+key+",value1=>"+value1+",options=>"+options.getOrElse("fdjkljsfgshskjfdh"))
   })
   /**
    * rightOuterJoin
    */
   val result3=rdd3.rightOuterJoin(rdd4)
   result3.foreach(println);
   /**
    * 
    */
   val result4=rdd3.fullOuterJoin(rdd4)
   result4.foreach(println);
   /**
    * 檢視rdd的分割槽
    */
   val rdd1=sc.parallelize(Array(1,2,3,4,5),3);//parallelize方法相當於Java中的parallelizepares方法
   println("rdd partitions lengt="+rdd1.partitions.length)
   rdd1.count();
  
    
  }
  
}