Spark SQL Join原理分析
1. Join問題綜述:
Join有inner
,leftouter
,rightouter
,fullouter
,leftsemi
,leftanti
六種類型,對單獨版本的Join操作,可以將問題表述為:
IterA,IterB為兩個Iterator,根據規則A將兩個Iterator中相應的Row進行合並,然後按照規則B對合並後Row進行過濾。
比如Inner_join,它的合並規則A為:對IterA中每一條記錄,生成一個key,並利用該key從IterB的Map集合中獲取到相應記錄,並將它們進行合並;而對於規則B可以為任意過濾條件,比如IterA和IterB任何兩個字段進行比較操作。
對於IterA和IterB,當我們利用iterA中key去IterB中進行一一匹配時,我們稱IterA為streamedIter
,IterB為BuildIter
或者hashedIter
。即我們流式遍歷streamedIter
中每一條記錄,去hashedIter
中去查找相應匹配的記錄。
而這個查找過程中,即為Build
過程,每一次Build
操作的結果即為一條JoinRow(A,B)
,其中JoinRow(A)
來自streamedIter
,JoinRow(B)
來自BuildIter
,此時這個過程為BuildRight
,而如果JoinRow(B)
來自streamedIter
,JoinRow(A)
來自BuildIter
BuildLeft
,
有點拗口!那麽為什麽要去區分BuildLeft
和BuildRight
呢?對於leftouter
,rightouter
,leftsemi
,leftanti
,它們的Build類型是確定,即left*
為BuildRight
,right*
為BuildLeft
類型,但是對於inner
操作,BuildLeft
和BuildRight
兩種都可以,而且選擇不同,可能有很大性能區別:
BuildIter也稱為hashedIter,即需要將BuildIter構建為一個內存Hash,從而加速Build的匹配過程;此時如果BuildIter和streamedIter大小相差較大,顯然利用小的來建立Hash,內存占用要小很多!
總結一下:Join即由下面幾部分組成:
trait Join {
val joinType: JoinType //Join類型
val streamedPlan: SparkPlan //用於生成streamedIter
val buildPlan: SparkPlan //用於生成hashedIter
val buildSide: BuildSide //BuildLeft或BuildRight
val buildKeys: Seq[Expression] //用於從streamedIter中生成buildKey的表達式
val streamedKeys: Seq[Expression] //用於從hashedIter中生成streamedKey的表達式
val condition: Option[Expression]//對joinRow進行過濾
}
註:對於fullouter,IterA和IterB同時為streamedIter和hashedIter,即先IterA=streamedIter,IterB=hashedIter進行leftouter,然後再用先IterB=streamedIter,IterA=hashedIter進行leftouter,再把兩次結果進行合並。
1.1 幾種Join的實現
1.1.1 InnerJoin
- 利用streamIter中每個srow,從hashedIter中查找匹配項;
-
如果匹配成功,即構建多個JoinRow,否則返回empty
streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withLeft(srow) val matches = hashedIter.get(buildKeys(srow)) if (matches != null) { matches.map(joinRow.withRight(_)).filter(condition) } else { Seq.empty } }
1.1.2 LeftOutJoin
- leftIter即為streamIter,而RightIter即為hashedIter,不可以改變
- 利用streamIter中每個srow,從hashedIter中查找匹配項;
-
如果匹配成功,即構建多個JoinRow,否則返回JoinRow的Build部分為Null
val nullRow = new NullRow() streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withLeft(srow) val matches = hashedIter.get(buildKeys(srow)) if (matches != null) { matches.map(joinRow.withRight(_)).filter(condition) } else { Seq(joinRow.withRight(nullRow)) } }
1.1.3 RightOutJoin
- RightIter即為streamIter,而LeftIter即為hashedIter,不可以改變
- 利用streamIter中每個srow,從hashedIter中查找匹配項;
-
如果匹配成功,即構建多個JoinRow,否則返回JoinRow的Build部分為Null
val nullRow = new NullRow() streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withRight(srow)//註意與LeftOutJoin的區別 val matches = hashedIter.get(buildKeys(srow)) if (matches != null) { matches.map(joinRow.withLeft(_)).filter(condition) } else { Seq(joinRow.withLeft(nullRow)) } }
1.1.4 LeftSemi
- leftIter即為streamIter,而RightIter即為hashedIter,不可以改變
- 利用streamIter中每個srow,從hashedIter中查找匹配項;
- 如果匹配成功,即返回srow,否則返回empty
-
它不是返回JoinRow,而是返回srow
streamIter.filter{ srow => val matches = hashedIter.get(buildKeys(srow)) if(matches == null) { false //沒有找到匹配項 } else{ if(condition.isEmpty == false) { //需要對`假想`後joinrow進行判斷 val joinRow = new JoinedRow joinRow.withLeft(srow) ! matches.map(joinRow.withLeft(_)).filter(condition).isEmpty } else { true } } }
LeftSemi從邏輯上來說,它即為In判斷。
1.1.5 LeftAnti
- leftIter即為streamIter,而RightIter即為hashedIter,不可以改變
- 利用streamIter中每個srow,從hashedIter中查找匹配項;
- 它匹配邏輯為LeftSemi基本相反,即相當於No In判斷。
- 如果匹配不成功,即返回srow,否則返回empty
-
它不是返回JoinRow,而是返回srow
streamIter.filter{ srow => val matches = hashedIter.get(buildKeys(srow)) if(matches == null) { true //沒有找到匹配項 } else{ if(condition.isEmpty == false) { //需要對`假想`後joinrow進行判斷 val joinRow = new JoinedRow joinRow.withLeft(srow) matches.map(joinRow.withLeft(_)).filter(condition).isEmpty } else { false } } }
1.2 HashJoin與SortJoin
上面描述的Join是需要將BuildIter
在內存中構建為hashedIter
,從而加速匹配過程,因此我們也將這個Join稱為HashJoin。但是建立一個Hash表需要占用大量的內存。
那麽問題來:如果我們的Iter太大,無法建立Hash表怎麽吧?在分布式Join計算下,Join過程中發生在Shuffle階段,如果一個數據集的Key存在數據偏移,很容易出現一個BuildIter
超過內存大小,無法完成Hash表的建立,進而導致HashJoin失敗,那麽怎麽辦?
在HashJoin過程中,針對
BuildIter
建立hashedIter
是為了加速匹配過程中。匹配查找除了建立Hash表這個方法以外,將streamedIter和BuildIter進行排序,也是一個加速匹配過程,即我們這裏說的sortJoin。
排序不也是需要內存嗎?是的,首先排序占用內存比建立一個hash表要小很多,其次排序如果內存不夠,可以將一部分數據Spill到磁盤,而Hash為全內存,如果內存不夠,將會導致整個Shuffle失敗。
下面以InnerJoin的SortJoin實現為例子,講述它與HashJoin的區別:
- streamIter和BuildIter都需要為有序。
-
利用streamIter中每個srow,從BuildIter中順序查找,由於兩邊都是有序的,所以查找代價很小。
val buildIndex = 0 streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withLeft(srow) //順序查找 val matches = BuildIter.search(buildKeys(srow), buildIndex) if (matches != null) { matches.map(joinRow.withRight(_)).filter(condition) buildIndex += matches.length } else { Seq.empty } }
對於FullOuter
Join,如果采用HashJoin方式來實現,代價較大,需要建立雙向的Hash表,而基於SortJoin,它的代價與其他幾種Join相差不大,因此`FullOuter默認都是基於SortJon來實現。
2. Spark中的Join實現
Spark針對Join提供了分布式實現,但是Join操作本質上也是單機進行,怎麽理解?如果要對兩個數據集進行分布式Join,Spark會先對兩個數據集進行Exchange
,即進行ShuffleMap操作,將Key相同數據分到一個分區中,然後在ShuffleFetch過程中利用HashJoin/SortJoin單機版算法來對兩個分區進行Join操作。
另外如果Build端的整個數據集(非一個iter)大小較小,可以將它進行Broadcast操作,從而節約Shuffle的開銷。
因此Spark支持ShuffledHashJoinExec
,SortMergeJoinExec
,BroadcastHashJoinExec
三種Join算法,那麽它怎麽進行選擇的呢?
- 如果build-dataset支持Broadcastable,並且它的大小小於
spark.sql.autoBroadcastJoinThreshold
,默認10M,那麽優先進行BroadcastHashJoinExec - 如果dataset支持Sort,並且
spark.sql.join.preferSortMergeJoin
為True,那麽優先選擇SortMergeJoinExec - 如果dataset不支持Sort,那麽只能選擇
ShuffledHashJoinExec
了- 如果Join同時支持BuildRight和BuildLeft,那麽根據兩邊數據大小,優先選擇數據量小的進行Hash。
這一塊邏輯都在org.apache.spark.sql.execution.JoinSelection
中描述。ps:Spark也對Without joining keys
的Join進行支持,但是不在我們這次討論範圍中。
BroadcastHashJoinExec
val p = spark.read.parquet("/Users/p.parquet")
val p1 = spark.read.parquet("/Users/p1.parquet")
p.joinWith(p1, p("to_module") === p1("to_module"),"inner")
此時由於p和p1的大小都較小,它會默認選擇BroadcastHashJoinExec
== Physical Plan ==
BroadcastHashJoin [_1#269.to_module], [_2#270.to_module], Inner, BuildRight
:- Project p
:- Project p1
SortMergeJoinExec
val p = spark.read.parquet("/Users/p.parquet")
val p1 = spark.read.parquet("/Users/p1.parquet")
p.joinWith(p1, p("to_module") === p1("to_module"),"fullouter")
fullouterJoin不支持Broadcast和ShuffledHashJoinExec,因此為ShuffledHashJoinExec
== Physical Plan ==
SortMergeJoin [_1#273.to_module], [_2#274.to_module], FullOuter
:- Project p
:- Project p1
由於ShuffledHashJoinExec一般情況下,不會被選擇,它的條件比較苛責。
//首先不能進行Broadcast!
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.statistics.isBroadcastable ||
plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold(10M)
}
//其次spark.sql.join.preferSortMergeJoin必須設置false
//然後build端可以放的進內存!
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
//最後build端和stream端大小必須相差3倍!否則使用sort性能要好。
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes
}
//或者RowOrdering.isOrderable(leftKeys)==false
Spark SQL Join原理分析