SparkSQL – Join 的三種方式
Join常見分類以及基本實現機制
當前SparkSQL支援三種Join演算法-shuffle hash join、broadcast hash join以及sort merge join。其中前兩者歸根到底都屬於hash join,只不過在hash join之前需要先shuffle還是先broadcast。其實,這些演算法並不是什麼新鮮玩意,都是資料庫幾十年前的老古董了(參考),只不過換上了分散式的皮而已。不過話說回來,SparkSQL/Hive…等等,所有這些大資料技術哪一樣不是來自於傳統資料庫技術,什麼語法解析AST、基於規則優化(CRO)、基於代價優化(CBO)、列存,都來自於傳統資料庫。就拿shuffle hash join和broadcast hash join來說,hash join演算法就來自於傳統資料庫,而shuffle和broadcast是大資料的皮,兩者一結合就成了大資料的演算法了。因此可以這樣說,大資料的根就是傳統資料庫,傳統資料庫人才可以很快的轉型到大資料。好吧,這些都是閒篇。
Hash Join
先來看看這樣一條SQL語句:select * from order,item where item.id = order.i_id,很簡單一個Join節點,參與join的兩張表是item和order,join key分別是item.id以及order.i_id。現在假設這個Join採用的是hash join演算法,整個過程會經歷三步:
1. 確定Build Table以及Probe Table:這個概念比較重要,Build Table使用join key構建Hash Table,而Probe Table使用join key進行探測,探測成功就可以join在一起。通常情況下,小表會作為Build Table,大表作為Probe Table。此事例中item為Build Table,order為Probe Table。
2. 構建Hash Table:依次讀取Build Table(item)的資料,對於每一行資料根據join key(item.id)進行hash,hash到對應的Bucket,生成hash table中的一條記錄。資料快取在記憶體中,如果記憶體放不下需要dump到外存。
3. 探測:再依次掃描Probe Table(order)的資料,使用相同的hash函式對映Hash Table中的記錄,對映成功之後再檢查join條件(item.id = order.i_id),如果匹配成功就可以將兩者join在一起。
基本流程可以參考上圖,這裡有兩個小問題需要關注:
1. hash join效能如何?很顯然,hash join基本都只掃描兩表一次,可以認為o(a+b),較之最極端的笛卡爾集運算a*b,不知甩了多少條街
2. 為什麼Build Table選擇小表?道理很簡單,因為構建的Hash Table最好能全部載入在記憶體,效率最高;這也決定了hash join演算法只適合至少一個小表的join場景,對於兩個大表的join場景並不適用;
上文說過,hash join是傳統資料庫中的單機join演算法,在分散式環境下需要經過一定的分散式改造,說到底就是儘可能利用分散式計算資源進行並行化計算,提高總體效率。hash join分散式改造一般有兩種經典方案:
1. broadcast hash join:將其中一張小表廣播分發到另一張大表所在的分割槽節點上,分別併發地與其上的分割槽記錄進行hash join。broadcast適用於小表很小,可以直接廣播的場景。
2. shuffler hash join:一旦小表資料量較大,此時就不再適合進行廣播分發。這種情況下,可以根據join key相同必然分割槽相同的原理,將兩張表分別按照join key進行重新組織分割槽,這樣就可以將join分而治之,劃分為很多小join,充分利用叢集資源並行化。
Broadcast Hash Join
如下圖所示,broadcast hash join可以分為兩步:
1. broadcast階段:將小表廣播分發到大表所在的所有主機。廣播演算法可以有很多,最簡單的是先發給driver,driver再統一分發給所有executor;要不就是基於bittorrete的p2p思路;
2. hash join階段:在每個executor上執行單機版hash join,小表對映,大表試探;
SparkSQL規定broadcast hash join執行的基本條件為被廣播小表必須小於引數spark.sql.autoBroadcastJoinThreshold,預設為10M。
Shuffle Hash Join
在大資料條件下如果一張表很小,執行join操作最優的選擇無疑是broadcast hash join,效率最高。但是一旦小表資料量增大,廣播所需記憶體、頻寬等資源必然就會太大,broadcast hash join就不再是最優方案。此時可以按照join key進行分割槽,根據key相同必然分割槽相同的原理,就可以將大表join分而治之,劃分為很多小表的join,充分利用叢集資源並行化。如下圖所示,shuffle hash join也可以分為兩步:
1. shuffle階段:分別將兩個表按照join key進行分割槽,將相同join key的記錄重分佈到同一節點,兩張表的資料會被重分佈到叢集中所有節點。這個過程稱為shuffle
2. hash join階段:每個分割槽節點上的資料單獨執行單機hash join演算法。
看到這裡,可以初步總結出來如果兩張小表join可以直接使用單機版hash join;如果一張大表join一張極小表,可以選擇broadcast hash join演算法;而如果是一張大表join一張小表,則可以選擇shuffle hash join演算法;那如果是兩張大表進行join呢?
Sort-Merge Join
SparkSQL對兩張大表join採用了全新的演算法-sort-merge join,如下圖所示,整個過程分為三個步驟:
1. shuffle階段:將兩張大表根據join key進行重新分割槽,兩張表資料會分佈到整個叢集,以便分散式並行處理
2. sort階段:對單個分割槽節點的兩表資料,分別進行排序
3. merge階段:對排好序的兩張分割槽表資料執行join操作。join操作很簡單,分別遍歷兩個有序序列,碰到相同join key就merge輸出,否則取更小一邊,見下圖示意:
仔細分析的話會發現,sort-merge join的代價並不比shuffle hash join小,反而是多了很多。那為什麼SparkSQL還會在兩張大表的場景下選擇使用sort-merge join演算法呢?這和Spark的shuffle實現有關,目前spark的shuffle實現都適用sort-based shuffle演算法,因此在經過shuffle之後partition資料都是按照key排序的。因此理論上可以認為資料經過shuffle之後是不需要sort的,可以直接merge。
經過上文的分析,可以明確每種Join演算法都有自己的適用場景,資料倉庫設計時最好避免大表與大表的join查詢,SparkSQL也可以根據記憶體資源、頻寬資源適量將引數spark.sql.autoBroadcastJoinThreshold調大,讓更多join實際執行為broadcast hash join。
總結
Join操作是傳統資料庫中的一個高階特性,尤其對於當前MySQL資料庫更是如此,原因很簡單,MySQL對Join的支援目前還比較有限,只支援Nested-Loop Join演算法,因此在OLAP場景下MySQL是很難吃的消的,不要去用MySQL去跑任何OLAP業務,結果真的很難看。不過好訊息是MySQL在新版本要開始支援Hash Join了,這樣也許在將來也可以用MySQL來處理一些小規模的OLAP業務。
和MySQL相比,PostgreSQL、SQLServer、Oracle等這些資料庫對Join支援更加全面一些,都支援Hash Join演算法。由PostgreSQL作為核心構建的分散式系統Greenplum更是在資料倉庫中佔有一席之地,這和PostgreSQL對Join演算法的支援其實有很大關係。
總體而言,傳統資料庫單機模式做Join的場景畢竟有限,也建議儘量減少使用Join。然而大資料領域就完全不同,Join是標配,OLAP業務根本無法離開表與表之間的關聯,對Join的支援成熟度一定程度上決定了系統的效能,誇張點說,’得Join者得天下’。本文只是試圖帶大家真正走進Join的世界,瞭解常用的幾種Join演算法以及各自的適用場景。