Scala版運算元(包括:join,leftjoin,rightjoin,fulljoin,distinct,saveAsTextFile,foreachPartition,mapPartit)【程式碼】
阿新 • • 發佈:2019-01-30
package com.bjsxt.spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.actors.threadpool.Arrays import scala.collection.mutable.ListBuffer object SparkJoin { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("test").setMaster("local"); val sc=new SparkContext(conf); val rdd6=sc.parallelize(Array("a","b","b","b","c","d","d","e","f","g","h"),2); /** * saveAsTextFile:將計算結果儲存成檔案 */ rdd6.saveAsTextFile("./result") /** * foreachPartition:只是簡單的分割槽遍歷,沒有其他的操作 */ val rdd8=rdd6.foreachPartition(iter=>{ while(iter.hasNext){ println(iter.next()) } }) println("_____________________________________________________-") /** * mapPartitions插入分割槽操作 */ val rdd7= rdd6.mapPartitions(iter=>{ //遍歷一個分割槽,當有插入一個數據庫操作的時候,需要用mapPartitions操作 val list=ListBuffer[String](); while(iter.hasNext){ val one=iter.next(); list.+=(one+"~") } list.iterator }, true) rdd7.foreach(println) println("遍歷結束了+++++++++++++++++++++++++++++++++++++++++++++++++=") val rdd5=sc.parallelize(Array("a","b","b","b","c","d","d"),2); val result5=rdd5.distinct(); result5.foreach(println) val rdd2=sc.parallelize(Array((1,2),(3,4),(5,6))); val rdd3= sc.parallelize(Array(("zhangsan","18"),("lisi","23"),("wangwu","20"),("maliu","50"))) val rdd4= sc.parallelize(Array(("zhangsan","180"),("lisi","230"),("wangwu","200"),("tianqi","500"))) /** * join操作 */ val result=rdd3.join(rdd4); result.foreach(println); /** * leftOuterJoin */ val result2=rdd3.leftOuterJoin(rdd4); result2.foreach(println); println("leftouterjoin的細化操作:") result2.foreach(tuple=>{ val key=tuple._1 val value1=tuple._2._1 val options=tuple._2._2 println("key=>"+key+",value1=>"+value1+",options=>"+options.getOrElse("fdjkljsfgshskjfdh")) }) /** * rightOuterJoin */ val result3=rdd3.rightOuterJoin(rdd4) result3.foreach(println); /** * */ val result4=rdd3.fullOuterJoin(rdd4) result4.foreach(println); /** * 檢視rdd的分割槽 */ val rdd1=sc.parallelize(Array(1,2,3,4,5),3);//parallelize方法相當於Java中的parallelizepares方法 println("rdd partitions lengt="+rdd1.partitions.length) rdd1.count(); } }