探究Presto SQL引擎(2)-淺析Join
作者:vivo網際網路技術-Shuai Guangying
在《探究Presto SQL引擎(1)-巧用Antlr》中,我們介紹了Antlr的基本用法以及如何使用Antlr4實現解析SQL查詢CSV資料,更加深入理解Presto查詢引擎支援的SQL語法以及實現思路。
本次帶來的是系列文章的第2篇,本文梳理了Join的原理,以及Join演算法在Presto中的實現思路。通過理論和實踐的結合,可以在理解原理的基礎上,更加深入理解Join演算法在OLAP場景下的工程落地技巧,比如火山模型,列式儲存,批量處理等思想的應用。
一、背景
在業務開發中使用資料庫,通常會有規範不允許過多表的Join。例如阿里巴巴開發手冊中,有如下的規定:
【強制】超過三個表禁止Join。需要Join的欄位,資料型別必須絕對一致;多表關聯查詢時,保證被關聯的欄位需要有索引。說明:即使雙表Join也要注意表索引、SQL效能。
在大資料數倉的建設中,儘管我們有星型結構和雪花結構,但是最終交付業務使用的大多是寬表。
可以看出業務使用資料庫中的一個矛盾點:我們需要Join來提供靈活的關聯操作,但是又要儘量避免多表和大表Join帶來的效能問題。這是為什麼呢?
二、Join的基本原理
在資料庫中Join提供的語義是非常豐富的。簡單總結如下:
通常理解Join的實現原理,從Cross Join是最好的切入點,也就是所謂的笛卡爾積。對於集合進行笛卡爾積運算,理解非常簡單,就是窮舉兩個集合中元素所有的組合情況。在資料庫中,集合就對應到資料表中的所有行(tuples),集合中的元素就對應到單行(tuple)。所以實現Cross Join的演算法也就呼之欲出了。
實現的程式碼樣例如下:
List<Tuple> r = newArrayList( new Tuple(newArrayList(1,"a")), new Tuple(newArrayList(2,"b"))); List<Tuple> s = newArrayList( new Tuple(newArrayList(3,"c")), new Tuple(newArrayList(4,"d"))); int cnt =0; for(Tuple ri:r){ for(Tuple si:s){ Tuple c = new Tuple().merge(ri).merge(si); System.out.println(++cnt+": "+ c); } } /** * out: 1: [1, a, 3, c] 2: [1, a, 4, d] 3: [2, b, 3, c] 4: [2, b, 4, d] */
可以看出實現邏輯非常簡單,就是兩個For迴圈巢狀。
2.1 Nested Loop Join演算法
在這個基礎上,實現Inner Join的第一個演算法就順其自然了。非常直白的名稱:Nested Loop,實現關鍵點如下:
(來源:Join Processing in Relational Databases)
其中,θ操作符可以是:=, !=, <, >, ≤, ≥。
相比笛卡爾積的實現思路,也就是添加了一層if條件的判斷用於過濾滿足條件的組合。
對於Nested Loop演算法,最關鍵的點在於它的執行效率。假如參與Join的兩張表一張量級為1萬,一張量級為10w,那麼進行比較的次數為1w*10w=10億次。在大資料時代,通常一張表資料量都是以億為單位,如果使用Nested Loop Join演算法,那麼Join操作的比較次數直接就是天文數字了。所以Nested Loop Join基本上是作為萬不得已的保底方案。Nested Loop這個框架下,常見的優化措施如下:
-
小表驅動大表,即資料量較大的集作為於for迴圈的內部迴圈。
-
一次處理一個數據塊,而不是一條記錄。也就是所謂的Block Nested Loop Join,通過分塊降低IO次數,提升快取命中率。
值得一提的是Nested Loop Join的思想雖然非常樸素,但是天然的具備分散式、並行的能力。這也是為什麼各類NoSQL資料庫中依然保留Nested Loop Join實現的重要一點。雖然單機序列執行慢,但是可以並行化的話,那就是加機器能解決的問題了。
2.2 Sort Merge Join演算法
通過前面的分析可以知道,Nested Loop Join演算法的關鍵問題在於比較次數過多,演算法的複雜度為O(m*n),那麼突破口也得朝著這個點。如果集合中的元素是有序的,比較的次數會大幅度降低,避免很多無意義的比較運算。對於有序的所以Join的第二種實現方式如下所描述:
(來源:Join Processing in Relational Databases)s)
通過將JOIN操作拆分成Sort和Merge兩個階段實現Join操作的加速。對於Sort階段,是可以提前準備好可以複用的。這樣的思想對於MySQL這類關係型資料庫是非常友好的,這也能解釋阿里巴巴開發手冊中要求關聯的欄位必須建立索引,因為索引保證了資料有序。該演算法時間複雜度為排序開銷O(m_log(m)+n_log(n))+合併開銷O(m+n)。但是通常由於索引保證了資料有序,索引其時間複雜度為O(m+n)。
2.3 Hash Join演算法
Sort Merge Join的思想在落地中有一定的限制。所謂成也蕭何敗蕭何,對於基於Hadoop的數倉而言,保證資料儲存的有序性這個點對於效能影響過大。在海量資料的背景下,維護索引成本是比較大的。而且索引還依賴於使用場景,不可能每個欄位都建一個索引。在資料表關聯的場景是大表關聯小表時,比如:使用者表(大表)--當日訂單表(小表);事實表(大表)–維度表(小表),可以通過空間換時間。回想一下,在基礎的資料結構中,tree結構和Hash結構可謂資料處理的兩大法寶:一個保證資料有序方便實現區間搜尋,一個通過hash函式實現精準命中點對點查詢效率高。
在這樣的背景下,通過將小表Hash化,實現Join的想法也就不足為奇了。
(來源:Join Processing in Relational Databases)
而且即使一張表在單機環境生成Hash記憶體消耗過大,還可以利用Hash將資料進行切分,實現分散式能力。所以,在Presto中Join演算法通常會選擇Hash Join,該演算法的時間複雜度為O(m+n)。
通過相關資料的學習,可以發現Join演算法的實現原理還是相當簡單的,排序和Hash是資料結構最為基礎的內容。瞭解了Join的基本思想,如何落地實踐出來呢?畢竟talk is cheap。在專案中實現Join之前,需要一些鋪墊知識。通常來說核心演算法是皇冠上的明珠,但是僅有明珠是不夠的還需要皇冠作為底座。
三、Join工程化前置條件
3.1 SQL處理架構-火山模型
在將Join演算法落地前,需要先了解一下資料庫處理資料的基本架構。在理解架構的基礎上,才能將Join演算法放置到合適的位置。在前面系列文章中探討了基於antlr實現SQL語句的解析。可以發現SQL語法支援的操作型別非常豐富:查詢表(TableScan),過濾資料(Filter),排序(Order),限制(Limit),欄位進行運算(Project), 聚合(Group),關聯(Join)等。為了實現上述的能力,需要一個具備並行化能力且可擴充套件的架構。
1994年Goetz Graefe在論文《Volcano-An Extensible and Parallel Query Evaluation System》提出了一個架構設計思想,這就是大名鼎鼎的火山模型,也稱為迭代模型。火山模型其實包含了檔案系統和查詢處理兩個部分,這裡我們重點關注查詢處理的設計思想。架構圖如下:
(來源:《Balancing vectorized execution with bandwidth-optimized storage》)
簡單解讀一下:
職責分離:將不同操作獨立成一個的Operator,Operator採用open-next-close的迭代器模式。
例如對於SQL 。
SELECT Id, Name, Age, (Age - 30) * 50 AS Bonus
FROM People
WHERE Age > 30
對應到Scan, Select, Project三個Operator,資料互動通過next()函式實現。上述的理論在Presto中可以對應起來,例如Presto中幾個常用的Operator, 基本上是見名知意:
動態組裝:Operator基於SQL語句的解析實現動態組裝,多個Operator形成一個管道(pipeline)。
例如:print和predicate兩個operator形成一個管道:
(來源: 《Volcano-An Extensible and Parallel Query Evaluation System》)
在火山模型的基礎上,Presto吸收了資料庫領域的其他思想,對基礎的火山模型進行了優化改造,主要體現在如下幾點:
-
Operator資料處理優化成一次一個Page,而不是一次行(也稱為tuple)。
-
Page的儲存採用列式結構。即相同的列封裝到一個Block中。
批量處理結合列式儲存奠定了向量化計算的基礎。這也是資料庫領域的優化方向。
3.2 批量處理和列式儲存
在研讀Presto原始碼時,幾乎到處都可以看到Page/Block的身影。所以理解Page/Block背後的思想是理解Presto實現機制的基礎。有相關書籍和文件講解Page/Block的概念,但是由於這些概念是跟其他概念混在一起呈現,導致一時間不容易理解。
筆者認為Type-Block-Page三者放在一起,更容易理解。我們使用資料庫,通常需要定義表,欄位名稱,欄位型別。在傳統的DBMS中,通常是按行儲存資料,通常結構如下:
(來源:《資料庫系統實現》)
但是通常OLAP場景不需要讀取所有的欄位,基於這樣的場景,就衍生出來了列式儲存。就是我們看到的如下結構:
(來源:《Presto技術內幕》)
即每個欄位對應一個Block, 多個Block的切面才是一條記錄,也就是所謂的行,在一些論文中稱為tuple。通過對比可以清楚看出Presto中,Page就是典型了列式儲存的實現。所以在Presto中,每個Type必然會關聯到一種Block。例如:bigint型別就對應著LongArrayBlockBuilder,varchar型別對應著VariableWidthBlock。
理解了原理,操作Page/Block就變得非常簡單了,簡單的demo程式碼如下:
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarcharType;
import com.google.common.collect.Lists;
import io.airlift.slice.Slice;
import java.util.List;
import static io.airlift.slice.Slices.utf8Slice;
/**
* PageBlockDemo
*
* @version 1.0
* @since 2021/6/22 19:26
*/
public class PageBlockDemo {
private static Page buildPage(List<Type> types,List<Object[]> dataSet){
PageBuilder pageBuilder = new PageBuilder(types);
// 封裝成Page
for(Object[] row:dataSet){
// 完成一行
pageBuilder.declarePosition();
for (int column = 0; column < types.size(); column++) {
BlockBuilder out = pageBuilder.getBlockBuilder(column);
Object colVal = row[column];
if(colVal == null){
out.appendNull();
}else{
Type type = types.get(column);
Class<?> javaType = type.getJavaType();
if(javaType == long.class){
type.writeLong(out,(long)colVal);
}else if(javaType == Slice.class){
type.writeSlice(out, utf8Slice((String)colVal));
}else{
throw new UnsupportedOperationException("not implemented");
}
}
}
}
// 生成Page
Page page = pageBuilder.build();
pageBuilder.reset();
return page;
}
private static void readColumn(List<Type> types,Page page){
// 從Page中讀取列
for(int column=0;column<types.size();column++){
Block block = page.getBlock(column);
Type type = types.get(column);
Class<?> javaType = type.getJavaType();
System.out.print("column["+type.getDisplayName()+"]>>");
List<Object> colList = Lists.newArrayList();
for(int pos=0;pos<block.getPositionCount();pos++){
if(javaType == long.class){
colList.add(block.getLong(pos));
}else if(javaType == Slice.class){
colList.add(block.getSlice(pos,0,block.getSliceLength(pos)).toStringUtf8());
}else{
throw new UnsupportedOperationException("not implemented");
}
}
System.out.println(colList);
}
}
public static void main(String[] args) {
/**
* 假設有兩個欄位,一個欄位型別為int, 一個欄位型別為varchar
*/
List<Type> types = Lists.newArrayList(BigintType.BIGINT, VarcharType.VARCHAR);
// 按行儲存
List<Object[]> dataSet = Lists.newArrayList(
new Object[]{1L,"aa"},
new Object[]{2L,"ba"},
new Object[]{3L,"cc"},
new Object[]{4L,"dd"});
Page page = buildPage(types, dataSet);
readColumn(types,page);
}
}
// 執行結果:
//column[bigint]>>[1, 2, 3, 4]
//column[varchar]>>[aa, ba, cc, dd]
將資料封裝成Page在各個Operator中流轉,一方面避免了物件的序列化和反序列化成本,另一方面相比tuple的方式降低了函式呼叫的開銷。這跟集裝箱運貨降低運輸成本的思想是類似的。
四、Join演算法的工程實踐
理解了Join的核心演算法和基礎架構,結合前文中對antlr實現SQL表示式的解析以及實現where條件過濾,我們已經具備了實現Join的基礎條件。接下來簡單講述一下Join演算法的落地流程。首先在語法層面需要支援Join的語法,由於本文目的在於研究演算法實現流程,而不在於實現完整的Join功能,因此我們暫且先考慮支援兩張表單欄位的等值Join語法。
首先在語法上需要支援Join, 基於antlr語法的定義關鍵點如下:
querySpecification
: SELECT selectItem (',' selectItem)*
(FROM relation (',' relation)*)?
(WHERE where=booleanExpression)?
;
selectItem
: expression #selectSingle
;
relation
: left=relation
(
joinType JOIN rightRelation=relation joinCriteria
) #joinRelation
| sampledRelation #relationDefault
;
joinType
: INNER?
;
joinCriteria
: ON booleanExpression
;
上述的語法定義將Join的關鍵要素拆解得非常清晰:Join的左表, Join的型別,Join關鍵詞, Join的右表, Join的關聯條件。例如,通常我們最簡單的Join語句用例如下(借用presto的tpch資料來源):
select t2.custkey, t2.phone, t1.orderkey from orders t1 inner join customer t2 on t1.custkey=t2.custkey limit 10;
對應著語法和SQL語句用例,可以看到在將Join演算法落地,還需要考慮如下細節點:
-
檢測SQL語句,確保SQL語句符合語法要求。
-
梳理表的別名和欄位的對應關係,確保查詢的欄位和表能夠對應起來,Join條件的欄位型別能夠匹配。
-
Join演算法的選取,是HashJoin還是NestedLoopJoin還是SortMergeJoin?
-
哪個表是build表,哪個表是probe表?
-
Join條件的判斷如何實現?
-
整個查詢涉及到Operator如何組裝,以實現最終結果的輸出?
我們回顧一下SQL執行的關鍵流程:
(來源: Query Execution Flow Architecture (SQL Server))
基於上面的流程,問題其實已經有了答案。
-
Parser:藉助antlr的能力即可實現SQL語法的檢測。
-
Binding:基於SQL語句生成AST,利用元資料檢測欄位和表的對映關係以及Join條件的欄位型別。
-
Planner:基於AST生成查詢計劃。
-
Executor:基於查詢計劃生成對應的Operator並執行。
以NestedLoop Join演算法為例,瞭解一下Presto的實現思路。對於NestedLoopJoin Join演算法的落地,在Presto中其實是拆解為兩個階段:組合階段和過濾階段。在實現JoinOperator時,只需負責兩個表資料的笛卡爾積組合即可。核心程式碼如下:
// NestedLoopPageBuilder中實現兩個Page計算笛卡爾積的處理邏輯,這裡RunLengthEncodedBlock用於一個元素複製,典型地笛卡爾積計算中需要將一列元素從1行復製成多行。
@Override
public Page next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
if (noColumnShortcutResult >= 0) {
rowIndex = maxRowIndex;
return new Page(noColumnShortcutResult);
}
rowIndex++;
// Create an array of blocks for all columns in both pages.
Block[] blocks = new Block[numberOfProbeColumns + numberOfBuildColumns];
// Make sure we always put the probe data on the left and build data on the right.
int indexForRleBlocks = buildPageLarger ? 0 : numberOfProbeColumns;
int indexForPageBlocks = buildPageLarger ? numberOfProbeColumns : 0;
// For the page with less rows, create RLE blocks and add them to the blocks array
for (int i = 0; i < smallPage.getChannelCount(); i++) {
Block block = smallPage.getBlock(i).getSingleValueBlock(rowIndex);
blocks[indexForRleBlocks] = new RunLengthEncodedBlock(block, largePage.getPositionCount());
indexForRleBlocks++;
}
// Put the page with more rows in the blocks array
for (int i = 0; i < largePage.getChannelCount(); i++) {
blocks[indexForPageBlocks + i] = largePage.getBlock(i);
}
return new Page(largePage.getPositionCount(), blocks);
}
五、小結
本文簡單梳理了Join的基本演算法以及在Presto中實現的基本框架,並以NestedLoop Join演算法為例,演示了在Presto中的實現核心點。可以看出相比原始的演算法描述,Presto的工程落地是截然不同: 不僅支援了所有的Join語義,而且實現了分散式能力。這其中有架構層面的思考,也有效能層面的思考,非常值得探索跟研究。就Join演算法,可以探索的點還有很多,比如多表Join的順序選取,大表跟小表Join的演算法優化,Semi Join的演算法優化,Join演算法資料傾斜的問題等等,可謂路漫漫其修遠兮,將在後續系列文章中繼續分析探索。