Spark DataFrame中的join型別
Spark DataFrame中join與SQL很像,都有inner join, left join, right join, full join;
那麼join方法如何實現不同的join型別呢?
看其原型
def join(right : DataFrame, usingColumns : Seq[String], joinType : String) : DataFrame
def join(right : DataFrame, joinExprs : Column, joinType : String) : DataFrame
可見,可以通過傳入String型別的joinType來實現。
joinType可以是”inner”、“left”、“right”、“full”分別對應inner join, left join, right join, full join,預設值是”inner”,代表內連線
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person")).show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "inner").show()
結果如下:
id_person | name | address | id_order | orderNum | id_person |
---|---|---|---|---|---|
1 | 張三 | 深圳 | 3 | 533 | 1 |
1 | 張三 | 深圳 | 4 | 444 | 1 |
2 | 李四 | 成都 | 1 | 325 | 2 |
3 | 王五 | 廈門 | 2 | 34 | 3 |
“left”,”left_outer”或者”leftouter”代表左連線
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "left").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person" ) === orderDataFrame("id_person"), "left_outer").show()
結果如下:
id_person | name | address | id_order | orderNum | id_person |
---|---|---|---|---|---|
1 | 張三 | 深圳 | 3 | 533 | 1 |
1 | 張三 | 深圳 | 4 | 444 | 1 |
2 | 李四 | 成都 | 1 | 325 | 2 |
3 | 王五 | 廈門 | 2 | 34 | 3 |
4 | 朱六 | 杭州 | null | null | null |
“right”,”right_outer”及“rightouter”代表右連線
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "right").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "right_outer").show()
結果如下:
id_person | name | address | id_order | orderNum | id_person |
---|---|---|---|---|---|
2 | 李四 | 成都 | 1 | 325 | 2 |
3 | 王五 | 廈門 | 2 | 34 | 3 |
1 | 張三 | 深圳 | 3 | 533 | 1 |
1 | 張三 | 深圳 | 4 | 444 | 1 |
null | null | null | 5 | 777 | 11 |
“full”,”outer”,”full_outer”,”fullouter”代表全連線
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "full").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "full_outer").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "outer").show()
結果如下:
id_person | name | address | id_order | orderNum | id_person |
---|---|---|---|---|---|
1 | 張三 | 深圳 | 3 | 533 | 1 |
1 | 張三 | 深圳 | 4 | 444 | 1 |
2 | 李四 | 成都 | 1 | 325 | 2 |
3 | 王五 | 廈門 | 2 | 34 | 3 |
4 | 朱六 | 杭州 | null | null | null |
null | null | null | 5 | 777 | 11 |
scala測試原始碼:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
case class Persons(id_person: Int, name: String, address: String)
case class Orders(id_order: Int, orderNum: Int, id_person: Int)
object DataFrameTest {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("DataFrameTest")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val personDataFrame = sqlContext.createDataFrame(List(Persons(1, "張三", "深圳"), Persons(2, "李四", "成都"), Persons(3, "王五", "廈門"), Persons(4, "朱六", "杭州")))
val orderDataFrame = sqlContext.createDataFrame(List(Orders(1, 325, 2), Orders(2, 34, 3), Orders(3, 533, 1), Orders(4, 444, 1), Orders(5, 777, 11)))
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person")).show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "inner").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "left").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "left_outer").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "right").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "right_outer").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "full").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "full_outer").show()
personDataFrame.join(orderDataFrame, personDataFrame("id_person") === orderDataFrame("id_person"), "outer").show()
}
}
如何實現的呢?檢視spark原始碼中sql部分可知其是將String型別轉換為了JoinType
JoinType的伴生物件中對String型別的typ先轉換成小寫,然後去掉typ中的下劃線 _
,之後用模式匹配來決定用的是哪種join型別,另外,從原始碼中可知,除了內連線、左連線、右連線、全連線外,還有個LeftSemi連線,這種連線沒用過,不太清楚
Spark中JoinType原始碼:
object JoinType {
def apply(typ: String): JoinType = typ.toLowerCase.replace("_", "") match {
case "inner" => Inner
case "outer" | "full" | "fullouter" => FullOuter
case "leftouter" | "left" => LeftOuter
case "rightouter" | "right" => RightOuter
case "leftsemi" => LeftSemi
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter",
"leftouter", "left",
"rightouter", "right",
"leftsemi")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
}
}
sealed abstract class JoinType
case object Inner extends JoinType
case object LeftOuter extends JoinType
case object RightOuter extends JoinType
case object FullOuter extends JoinType
case object LeftSemi extends JoinType