1. 程式人生 > 其它 >sparksql的join有哪些及實現原理

sparksql的join有哪些及實現原理

sparksql的3種join實現

1、Broadcast Join (小表對大表)

在資料庫的常見模型中(比如星型模型或者雪花模型),表一般分為兩種:事實表和維度表。

維度表一般指固定的、變動較少的表,例如聯絡人、物品種類等,一般資料有限。

事實表一般記錄流水,比如銷售清單等,通常隨著時間的增長不斷膨脹。

因為Join 操作是對兩個表中key值相同的記錄進行連線,在SparkSQL中,對兩個表做join最直接的方式是先根據key分割槽,再在每個分割槽中把key值相同的記錄拿出來做

連線操作。但這樣就不可避免地涉及到shuffle,而shuffle在spark中比較耗時的操作,我們應該儘可能的設計Spark應用使其避免大量的shuffle。

當維度表和事實表進行join操作時,為了避免shuffle,我們可以將大小有限的維度表的全部資料分發到每個節點上,供事實表使用。executor儲存維度表的全部資料,一定程度上犧牲了

空間,換取shuffle操作大量的耗時,這在SparkSQL中稱作 Broadcast Join。

 Table B 是較小的表,黑色表示將其廣播到每個executor節點上,Table A 的每個partition 會通過 block manager取到Table A的資料。根據每條記錄的 Join Key 取到

Table B中相對應的記錄,根據 Join Type進行操作。這個過程比較簡單,不做贅述。

ps:禁用廣播命令:  set spark.sql.autoBroadcastJoinThreshold=-1;

Broadcast Join 的條件有以下幾個:

(1)被廣播的表需要小於 spark.sql.autoBroadcastJoinThreshold所配置的值,預設是10M(或者加了 broadcast join的 hint)

(2)基表不能被廣播,比如 left outer join時,只能廣播右表。

看起來廣播是一個比較理想的方案,但它有沒有缺點呢?也很明顯。這個方案只能用於廣播較小的表,否則資料的冗餘傳輸就遠大於shuffle的開銷;

另外,廣播時需要將被廣播的表collect 到driver端,然後由driver端將資料分發到其他executor,當頻繁有廣播出現時,對driver的記憶體也是一個考驗。

2、Shuffle Hash Join

當一側的表比較小時,我們選擇將其廣播出去以避免shuffle,提高效能。但因為被廣播的表首先被collect到driver端,然後被冗餘分發到每個executor上,所以當表比較大時,

採用 broadcast join 會對driver端和executor端造成較大的壓力。

但由於Spark 是一個分散式的計算引擎,可以通過分割槽的形式將大批量的資料劃分成n份較小的資料集進行平行計算。這種思想應用到Join上便是 Shuffle Hash Join 了。

利用key相同必然分割槽相同的這個原理,Spark SQL將較大表的 join 分而治之,先將表劃分成 n 個分割槽,再對兩個表中相對應分割槽的資料分別進行 Hash Join,這樣即在

一定程度上減少了driver廣播一側表的壓力,也減少了executor端取整張被廣播表的記憶體消耗。

Shuffle Hash Join 分為兩步:
1、對兩張表分別按照 join keys進行重分割槽,即shuffle,目的就是為了讓有相同 join  keys值的記錄分到對應的分割槽中。

2、對對應分割槽中的資料進行 join,此處先將小表分割槽構造為一張hash 表,然後根據大表分割槽中記錄的join keys值拿出來進行匹配。

Shuffle Hash Join 的條件有以下幾個:

1、分割槽的平均大小不超過 spark.sql.autoBroadcastJoinThreshold 所配置的值,預設是 10M。

2、基表不能被廣播,比如 left outer join 時,只能廣播右表。

3、一側的表要明顯小於另外一側,小的一側將被廣播(明顯小於的定義為3倍小)

我們可以看到,在一定大小的表中,SparkSQL從時空結合的角度來看,將兩個表進行重新分割槽,並且對小表中的分割槽進行hash化,從而完成join。

在保持一定複雜度的基礎上,儘量減少driver和executor的記憶體壓力,提升了計算時的穩定性。

Sort Merge Join (大表對大表)

上面介紹的兩種實現對於一定大小的表表適用,但當兩個表都非常大時,顯然無論用哪種都會對計算記憶體造成很大壓力。這是因為join 時兩者採取的都是 hash join,

是將一側的資料完全載入到記憶體中,使用 hash code取 join keys值相等的記錄進行連線。

當兩個表都非常大時,SparkSQL 採用了一種全新的方案來對標進行 join,即 Sort Merge Join 。這種實現方式不用將一側資料全部載入後再進行 hash join,但需要在

join 前將資料排序。

可以看到,首先將兩張表按照 join keys 進行了重新shuffle,保證 join keys值相同的記錄會被分在相應的分割槽。分割槽後對每個分割槽內的資料進行排序,排序後

再對相應的分割槽內的記錄進行連線。

因為兩個序列都是有序的,從頭遍歷,碰到 key 相同的就輸出,如果不同,左邊小就繼續取左邊,反之取右邊。

可以看出,無論分割槽有多大,Sort Merge Join 都不用把某一側的資料全部載入到記憶體中,而是即用即丟,從而大大提升了大數量下 sql join 的穩定性。