spark:Dataset的join操作
阿新 • • 發佈:2019-01-24
以下是兩個具有相同列名的Dataset按照同名列相等進行join操作,join結果中同名列只會出現一個:
val df1 = Seq((1, 2, 3),(1, 1, 1)).toDF("a", "b", "c")
val df2 = Seq((1, 2, 4),(2, 2, 2)).toDF("a", "b", "c")
df1.show
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
| 1| 1| 1|
+---+---+---+
df2.show
+---+---+---+
| a| b| c|
+---+---+---+
| 1 | 2| 4|
| 2| 2| 2|
+---+---+---+
df1.join(df2, Seq("a","b"), "inner").show
+---+---+---+---+
| a| b| c| c|
+---+---+---+---+
| 1| 2| 3| 4|
+---+---+---+---+
df1.join(df2, Seq("a","b"), "outer").show
+---+---+----+----+
| a| b| c| c|
+---+---+----+----+
| 2| 2|null| 2|
| 1| 2 | 3| 4|
| 1| 1| 1|null|
+---+---+----+----+
df1.join(df2, Seq("a","b"), "right").show
+---+---+----+---+
| a| b| c| c|
+---+---+----+---+
| 1| 2| 3| 4|
| 2| 2|null| 2|
+---+---+----+---+
df1.join(df2, Seq("a","b"), "left").show
+---+---+---+----+
| a| b| c| c|
+---+---+---+----+
| 1 | 2| 3| 4|
| 1| 1| 1|null|
+---+---+---+----+
df1.join(df2, Seq("a","b"), "leftsemi").show
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
+---+---+---+
df1.join(df2, Seq("a","b"), "leftanti").show
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 1| 1|
+---+---+---+
以下是具有不同列名的Dataset按照不同名列相等進行join操作,join結果中兩個不同名列都會出現(因為列名不同不能只保留某一列):
val df1 = Seq((1, 2, 3),(1, 1, 1)).toDF("a", "b", "c")
val df2 = Seq((1, 2, 4),(2, 2, 2)).toDF("a1", "b1", "c1")
df1.show
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
| 1| 1| 1|
+---+---+---+
df2.show
+---+---+---+
| a1| b1| c1|
+---+---+---+
| 1| 2| 4|
| 2| 2| 2|
+---+---+---+
df1.join(df2, col("a") === col("a1") && col("b") === col("b1"), "outer").show
+----+----+----+----+----+----+
| a| b| c| a1| b1| c1|
+----+----+----+----+----+----+
|null|null|null| 2| 2| 2|
| 1| 2| 3| 1| 2| 4|
| 1| 1| 1|null|null|null|
+----+----+----+----+----+----+
若是具有部分相同、部分不同列名的兩個Dataset按照部分相同、部分不同列相等進行join操作,有以下幾種方式:
val df1 = Seq((1, 2, 3),(1, 1, 1)).toDF("a", "b", "c")
val df2 = Seq((1, 2, 4),(2, 2, 2)).toDF("a", "b1", "d")
df1.show
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
| 1| 1| 1|
+---+---+---+
df2.show
+---+---+---+
| a| b1| d|
+---+---+---+
| 1| 2| 4|
| 2| 2| 2|
+---+---+---+
//join條件:df1("a") == df2("a") && df1("b") == df2("b1")
//若是直接join會報錯:org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be:...
df1.join(df2, col("a") === col("a") && col("b") === col("b1"), "outer").show
//可以改為這樣:
df1.join(df2, df1("a") === df2("a") && col("b") === col("b1"), "outer").show
+----+----+----+----+----+----+
| a| b| c| a| b1| d|
+----+----+----+----+----+----+
|null|null|null| 2| 2| 2|
| 1| 2| 3| 1| 2| 4|
| 1| 1| 1|null|null|null|
+----+----+----+----+----+----+
//當然也可以將其中一個Dataset的列改名,改為都相同或都不同,再用上面的方法join
df1.join(df2.withColumnRenamed("b1", "b"), Seq("a", "b"), "outer").show
+---+---+----+----+
| a| b| c| d|
+---+---+----+----+
| 2| 2|null| 2|
| 1| 2| 3| 4|
| 1| 1| 1|null|
+---+---+----+----+
//還可以用Dataset的as方法(與alias方法等效),給Dataset命名,然後消除歧義。(Dataset的別名類似SQL中表的別名)
df1.alias("df1")
.join(df2.as("df2"), col("df1.a") === col("df2.a") && col("b") === col("b1"), "outer")
.show
+----+----+----+----+----+----+
| a| b| c| a| b1| d|
+----+----+----+----+----+----+
|null|null|null| 2| 2| 2|
| 1| 2| 3| 1| 2| 4|
| 1| 1| 1|null|null|null|
+----+----+----+----+----+----+
//如果只想保留df2的a列:
val t = df1.alias("df1")
.join(df2.as("df2"), col("df1.a") === col("df2.a") && col("b") === col("b1"), "outer")
.drop(col("df1.a")).show
+----+----+----+----+----+
| b| c| a| b1| d|
+----+----+----+----+----+
|null|null| 2| 2| 2|
| 2| 3| 1| 2| 4|
| 1| 1|null|null|null|
+----+----+----+----+----+
補充:
Dataset的as方法(與alias方法等效):為Dataset物件起別名,Dataset的別名類似SQL中表的別名。
val df = Seq((1, 2),(1, 1)).toDF("a", "b")
df.select("a").show
+---+
| a|
+---+
| 1|
| 1|
+---+
df.select("df.a").show
//報錯:org.apache.spark.sql.AnalysisException: cannot resolve '`df.a`' given input columns: [a, b];
df.as("df").select("df.a").show
+---+
| a|
+---+
| 1|
| 1|
+---+
注意:
以上Dataset的as方法是指的:def as(alias: Symbol): Dataset[T]
。
另外Dataset還有一個as方法:def as[U](implicit arg0: Encoder[U]): Dataset[U]
,這篇文章和此方法無關。