spark中join的簡單操作
阿新 • • 發佈:2018-11-12
(1)RDD之間的join
import org.apache.spark.sql.SparkSession object joinDemo { //BroadcastHashJoin def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().appName("joinDemo").master("local[*]").getOrCreate() import sparkSession.implicits._ val lines =sparkSession.createDataset(Array("1,hanmeimei,China","2,tom,USA","3,Jerry,apan")) //資料整理 val tupleDS = lines.map(line=>{ val field=line.split(",") val id = field(0).toLong val name = field(1) val country = field(2) (id,name,country) }) val df1=tupleDS.toDF("id","name","country") val countrys = sparkSession.createDataset(List("China,中國","USA,美國")) val tupleDS2 =countrys.map(line=>{ val fields = line.split(",") val ename = fields(0) val cname = fields(1) (ename,cname) }) val df2 = tupleDS2.toDF("ename","cname") //建立一個檢視 df1.createTempView("t_user") df2.createTempView("t_countrys") //join操作 val res = sparkSession.sql("select u.id,u.name,c.cname from t_user as u join t_countrys as c on country=ename") res.show() //檢視執行計劃 res.explain() sparkSession.stop() } }
(2)DataFrame之間的join
import org.apache.spark.sql.{DataFrame, SparkSession} object joinDemo2 { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().appName("joinDemo").master("local[*]").getOrCreate() import sparkSession.implicits._ //對錶的大小不限制,預設10M sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold",-1) sparkSession.conf.set("spark.sql.join.preferSortMergeJoin",true) val df1: DataFrame =Seq( (0,"tom"), (1,"jeryy"), (2,"kate") ).toDF("id","name") val df2=Seq( (0,18), (1,20), (3,30) ).toDF("aid","age") df2.repartition() val res = df1.join(df2,$"id"===$"aid") res.explain() res.show() } }