Apache Flink 漫談系列 - JOIN LATERAL
聊什麼
上一篇《Apache Flink 漫談系列 - JOIN運算元》我們對最常見的JOIN做了詳盡的分析,本篇介紹一個特殊的JOIN,那就是JOIN LATERAL。JOIN LATERAL為什麼特殊呢,直觀說因為JOIN的右邊不是一個實際的物理表,而是一個VIEW或者Table-valued Funciton。如下圖所示:
本篇會先介紹傳統資料庫對LATERAL JOIN的支援,然後介紹Apache Flink目前對LATERAL JOIN的支援情況。
實際問題
假設我們有兩張表,一張是Customers表(消費者id, 所在城市), 一張是Orders表(訂單id,消費者id),兩張表的DDL(SQL Server)如下:
- Customers
CREATE TABLE Customers (
customerid char(5) NOT NULL,
city varchar (10) NOT NULL
)
insert into Customers values('C001','Beijing');
insert into Customers values('C002','Beijing');
insert into Customers values('C003','Beijing');
insert into Customers values('C004','HangZhou');
檢視資料:
- Orders
CREATE TABLE Orders(
orderid char(5) NOT NULL,
customerid char(5) NULL
)
insert into Orders values('O001','C001');
insert into Orders values('O002','C001');
insert into Orders values('O003','C003');
insert into Orders values('O004','C001');
檢視資料:
問題示例
假設我們想查詢所有Customers的客戶ID,地點和訂單資訊,我們想得到的資訊是:
用INNER JOIN解決
如果大家查閱了《Apache Flink 漫談系列 - JOIN運算元》,我想看到這樣的查詢需求會想到INNER JOIN來解決,SQL如下:
SELECT
c.customerid, c.city, o.orderid
FROM Customers c JOIN Orders o
ON o.customerid = c.customerid
查詢結果如下:
但如果我們真的用上面的方式來解決,就不會有本篇要介紹的內容了,所以我們換一種寫法。
用 Correlated subquery解決
Correlated subquery 是在subquery中使用關聯表的欄位,subquery可以在FROM Clause中也可以在WHERE Clause中。
- WHERE Clause
用WHERE Clause實現上面的查詢需求,SQL如下:
SELECT
c.customerid, c.city
FROM Customers c WHERE c.customerid IN (
SELECT
o.customerid, o.orderid
FROM Orders o
WHERE o.customerid = c.customerid
)
執行情況:
上面的問題是用在WHERE Clause裡面subquery的查詢列必須和需要比較的列對應,否則我們無法對o.orderid
進行投影, 上面查詢我為什麼要加一個o.orderid
呢,因為查詢需求是需要o.orderid
的,去掉o.orderid
查詢能成功,但是拿到的結果並不是我們想要的,如下:
SELECT
c.customerid, c.city
FROM Customers c WHERE c.customerid IN (
SELECT
o.customerid
FROM Orders o
WHERE o.customerid = c.customerid
)
查詢結果:
可見上面查詢結果缺少了o.orderid
,不能滿足我們的查詢需求。
- FROM Clause
用FROM Clause實現上面的查詢需求,SQL如下:
SELECT
c.customerid, c.city, o.orderid
FROM Customers c, (
SELECT
o.orderid, o.customerid
FROM Orders o
WHERE o.customerid = c.customerid
) as o
我們會得到如下錯誤:
錯誤資訊提示我們無法識別c.customerid
。在ANSI-SQL裡面FROM Clause裡面的subquery是無法引用左邊表資訊的,所以簡單的用FROM Clause裡面的subquery,也無法解決上面的問題,
那麼上面的查詢需求除了INNER JOIN
我們還可以如何解決呢?
JOIN LATERAL
我們分析上面的需求,本質上是根據左表Customers的customerid,去查詢右表的Orders資訊,就像一個For迴圈一樣,外層是遍歷左表Customers所有資料,內層是根據左表Customers的每一個Customerid去右表Orders中進行遍歷查詢,然後再將符合條件的左右表資料進行JOIN,這種根據左表逐條資料動態生成右表進行JOIN的語義,SQL標準裡面提出了LATERAL
關鍵字,也叫做 lateral drive table
。
CROSS APPLY和LATERAL
上面的示例我們用的是SQL Server進行測試的,這裡在多提一下在SQL Server裡面是如何支援 LATERAL
的呢?SQL Server是用自己的方言 CROSS APPLY
來支援的。那麼為啥不用ANSI-SQL的LATERAL
而用CROSS APPLY
呢? 可能的原因是當時SQL Server為了解決TVF問題而引入的,同時LATERAL
是SQL2003引入的,而CROSS APPLY
是SQL Server 2005就支援了,SQL Server 2005的開發是在2000年就進行了,這個可能也有個時間差,等LATERAL
出來的時候,CROSS APPLY
在SQL Server裡面已經開發完成了。所以種種原因SQL Server裡面就採用了CROSS APPLY
,但CROSS APPLY
的語義與LATERAL
卻完全一致,同時後續支援LATERAL
的Oracle12和PostgreSQL94同時支援了LATERAL
和CROSS APPLY
。
問題解決
那麼我們回到上面的問題,我們用SQL Server的CROSS APPLY
來解決上面問題,SQL如下:
上面得到的結果完全滿足查詢需求。
JOIN LATERAL 與 INNER JOIN 關係
上面的查詢需求並沒有體現JOIN LATERAL
和INNER JOIN
的區別,我們還是以SQL Server中兩個查詢執行Plan來觀察一下:
上面我們發現經過SQL Server優化器優化之後的兩個執行plan完全一致,那麼為啥還要再造一個LATERAL
出來呢?
效能方面
我們將上面的查詢需求稍微改變一下,我們查詢所有Customer和Customers的第一份訂單資訊。
- LATERAL 的寫法
SELECT
c.customerid, c.city, o.orderid
FROM Customers c CROSS APPLY (
SELECT
TOP(1) o.orderid, o.customerid
FROM Orders o
WHERE o.customerid = c.customerid
ORDER BY o.customerid, o.orderid
) as o
查詢結果:
我們發現雖然C001的Customer有三筆訂單,但是我們查詢的TOP1資訊。
- JOIN 寫法
SELECT c.customerid, c.city, o.orderid
FROM Customers c
JOIN (
SELECT
o2.*,
ROW_NUMBER() OVER (
PARTITION BY customerid
ORDER BY orderid
) AS rn
FROM Orders o2
) o
ON c.customerid = o.customerid AND o.rn = 1
查詢結果:
如上我們都完成了查詢需求,我們在來看一下執行Plan,如下:
我們直觀發現完成相同功能,使用CROSS APPLY
進行查詢,執行Plan簡單許多。
功能方面
在功能方面INNER JOIN
本身在ANSI-SQL中是不允許 JOIN 一個Function的,這也是SQL Server當時引入CROSS APPLY
的根本原因。我們以一個SQL Server中DMV(相當於TVF)查詢為例:
SELECT
name, log_backup_time
FROM sys.databases AS s
CROSS APPLY sys.dm_db_log_stats(s.database_id);
查詢結果:
Apache Flink對 LATERAL的支援
前面我花費了大量的章節來向大家介紹ANSI-SQL和傳統資料庫以SQL Server為例如何支援LATERAL
的,接下來我們看看Apache Flink對LATERAL
的支援情況。
Calcite
Apache Flink 利用 Calcite進行SQL的解析和優化,目前Calcite完全支援LATERAL
語法,示例如下:
SELECT
e.NAME, e.DEPTNO, d.NAME
FROM EMPS e, LATERAL (
SELECT
*
FORM DEPTS d
WHERE e.DEPTNO=d.DEPTNO
) as d;
查詢結果:
我使用的是Calcite官方自帶測試資料。
Flink
截止到Flink-1.6.2,Apache Flink 中有兩種場景使用LATERAL
,如下:
- UDTF(TVF) - User-defined Table Funciton
- Temporal Table - 涉及內容會在後續篇章單獨介紹。
本篇我們以在TVF(UDTF)為例說明 Apache Fink中如何支援LATERAL
。
UDTF
UDTF- User-defined Table Function是Apache Flink中三大使用者自定義函式(UDF,UDTF,UDAGG)之一。 自定義介面如下:
- 基類
/**
* Base class for all user-defined functions such as scalar functions, table functions,
* or aggregation functions.
*/
abstract class UserDefinedFunction extends Serializable {
// 關鍵是FunctionContext中提供了若干高階屬性(在UDX篇會詳細介紹)
def open(context: FunctionContext): Unit = {}
def close(): Unit = {}
}
- TableFunction
/**
* Base class for a user-defined table function (UDTF). A user-defined table functions works on
* zero, one, or multiple scalar values as input and returns multiple rows as output.
*
* The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation
* method. An evaluation method must be declared publicly, not static and named "eval".
* Evaluation methods can also be overloaded by implementing multiple methods named "eval".
*
* User-defined functions must have a default constructor and must be instantiable during runtime.
*
* By default the result type of an evaluation method is determined by Flink's type extraction
* facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
* complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
* can be manually defined by overriding [[getResultType()]].
*/
abstract class TableFunction[T] extends UserDefinedFunction {
// 對於泛型T,如果是基礎型別那麼Flink框架可以自動識別,
// 對於使用者自定義的複雜物件,需要使用者overwrite這個實現。
def getResultType: TypeInformation[T] = null
}
上面定義的核心是要求使用者實現eval
方法,我們寫一個具體示例。
- 示例
// 定義一個簡單的UDTF返回型別,對應介面上的 T
case class SimpleUser(name: String, age: Int)
// 繼承TableFunction,並實現evale方法
// 核心功能是解析以#分割的字串
class SplitTVF extends TableFunction[SimpleUser] {
// make sure input element's format is "<string>#<int>"
def eval(user: String): Unit = {
if (user.contains("#")) {
val splits = user.split("#")
collect(SimpleUser(splits(0), splits(1).toInt))
}
}
}
示例(完整的ITCase):
- 測試資料
我們構造一個只包含一個data欄位的使用者表,使用者表資料如下:
data |
---|
Sunny#8 |
Kevin#36 |
Panpan#36 |
- 查詢需求
查詢的需求是將data欄位flatten成為name和age兩個欄位的表,期望得到:
name | age |
---|---|
Sunny | 8 |
Kevin | 36 |
Panpan | 36 |
- 查詢示例
我們以ITCase方式完成如上查詢需求,完整程式碼如下:
@Test
def testLateralTVF(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStateBackend(getStateBackend)
StreamITCase.clear
val userData = new mutable.MutableList[(String)]
userData.+=(("Sunny#8"))
userData.+=(("Kevin#36"))
userData.+=(("Panpan#36"))
val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)"
val users = env.fromCollection(userData).toTable(tEnv, 'data)
val tvf = new SplitTVF()
tEnv.registerTable("userTab", users)
tEnv.registerFunction("splitTVF", tvf)
val result = tEnv.SQLQuery(SQLQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
StreamITCase.testResults.foreach(println(_))
}
執行結果:
上面的核心語句是:
val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)"
如果大家想執行上面的示例,請查閱《Apache Flink 漫談系列 - SQL概覽》中 原始碼方式 搭建測試環境。
小結
本篇重點向大家介紹了一種新的JOIN
型別 - JOIN LATERAL
。並向大家介紹了SQL Server中對LATERAL
的支援方式,詳細分析了JOIN LATERAL
和INNER JOIN
的區別與聯絡,最後切入到Apache Flink中,以UDTF
示例說明了Apache Flink中對JOIN LATERAL
的支援,後續篇章會介紹Apache Flink中另一種使用LATERAL
的場景,就是Temporal JION,Temporal JION也是一種新的JOIN型別,我們下一篇再見!
關於點贊和評論
本系列文章難免有很多缺陷和不足,真誠希望讀者對有收穫的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!