1. 程式人生 > >Apache Flink 漫談系列 - 持續查詢(Continuous Queries)

Apache Flink 漫談系列 - 持續查詢(Continuous Queries)

實際問題

我們知道在流計算場景中,資料是源源不斷的流入的,資料流永遠不會結束,那麼計算就永遠不會結束,如果計算永遠不會結束的話,那麼計算結果何時輸出呢?本篇將介紹Apache Flink利用持續查詢來對流計算結果進行持續輸出的實現原理。

資料管理

在介紹持續查詢之前,我們先看看Apache Flink對資料的管理和傳統資料庫對資料管理的區別,以MySQL為例,如下圖:
image

如上圖所示傳統資料庫是資料儲存和查詢計算於一體的架構管理方式,這個很明顯,oracle資料庫不可能管理MySQL資料庫資料,反之亦然,每種資料庫廠商都有自己的資料庫管理和儲存的方式,各自有特有的實現。在這點上Apache Flink海納百川(也有corner case),將data store 進行抽象,分為source(讀) 和 sink(寫)兩種型別介面,然後結合不同儲存的特點提供常用資料儲存的內建實現,當然也支援使用者自定義的實現。

那麼在巨集觀設計上Apache Flink與傳統資料庫一樣都可以對資料表進行SQL查詢,並將產出的結果寫入到資料儲存裡面,那麼Apache Flink上面的SQL查詢和傳統資料庫查詢的區別是什麼呢?Apache Flink又是如何做到求同(語義相同)存異(實現機制不同),完美支援ANSI-SQL的呢?

靜態查詢

傳統資料庫中對錶(比如 flink_tab,有user和clicks兩列,user主鍵)的一個查詢SQL(select * from flink_tab)在資料量允許的情況下,會立刻返回表中的所有資料,在查詢結果顯示之後,對資料庫表flink_tab的DML操作將與執行的SQL無關了。也就是說傳統資料庫下面對錶的查詢是靜態查詢,將計算的最終查詢的結果立即輸出,如下:

select * from flink_tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
|  1 | Mary |      1 |
+----+------+--------+
1 row in set (0.00 sec)

當我執行完上面的查詢,查詢結果立即返回,上面情況告訴我們表 flink_tab裡面只有一條記錄,id=1,user=Mary,clicks=1; 這樣傳統資料庫表的一條查詢語句就完全結束了。傳統資料庫表在查詢那一刻我們這裡叫Static table,是指在查詢的那一刻資料庫表的內容不再變化了,查詢進行一次計算完成之後表的變化也與本次查詢無關了,我們將在Static Table 上面的查詢叫做靜態查詢。

持續查詢

什麼是連續查詢呢?連續查詢發生在流計算上面,在 《Apache Flink 漫談系列 - 流表對偶(duality)性》 中我們提到過Dynamic Table,連續查詢是作用在Dynamic table上面的,永遠不會結束的,隨著表內容的變化計算在不斷的進行著...

靜態/持續查詢特點

靜態查詢和持續查詢的特點就是《Apache Flink 漫談系列 - 流表對偶(duality)性》中所提到的批與流的計算特點,批一次查詢返回一個計算結果就結束查詢,流一次查詢不斷修正計算結果,查詢永遠不結束,表格示意如下:

查詢型別 計算次數 計算結果
靜態查詢 1 最終結果
持續查詢 無限 不斷更新

靜態/持續查詢關係

接下來我們以flink_tab表實際操作為例,體驗一下靜態查詢與持續查詢的關係。假如我們對flink_tab表再進行一條增加和一次更新操作,如下:

MySQL> insert into flink_tab(user, clicks) values ('Bob', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update flink_tab set clicks=2 where user='Mary';
Query OK, 1 row affected (0.06 sec)

這時候我們再進行查詢 select * from flink_tab ,結果如下:

MySQL> select * from flink_tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
|  1 | Mary |      2 |
|  2 | Bob  |      1 |
+----+------+--------+
2 rows in set (0.00 sec)

那麼我們看見,相同的查詢SQL(select * from flink_tab),計算結果完全 不 一樣了。這說明相同的sql語句,在不同的時刻執行計算,得到的結果可能不一樣(有點像廢話),就如下圖一樣:
image

假設不斷的有人在對錶flink_tab做操作,同時有一個人間歇性的發起對錶資料的查詢,上圖我們只是在三個時間點進行了3次查詢。並且在這段時間內資料表的內容也在變化。引起上面變化的DML如下:

MySQL> insert into flink_tab(user, clicks) values ('Llz', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update flink_tab set clicks=2 where user='Bob';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> update flink_tab set clicks=3 where user='Mary';
Query OK, 1 row affected (0.05 sec)
Rows matched: 1  Changed: 1  Warnings: 0

到現在我們不難想象,上面圖內容的核心要點如下:

  • 時間
  • 表資料變化
  • 觸發計算
  • 計算結果更新

接下來我們利用傳統資料庫現有的機制模擬一下持續查詢...

無PK的 Append only 場景

接下來我們把上面隱式存在的時間屬性timestamp作為表flink_tab_ts(timestamp,user,clicks三列,無主鍵)的一列,再寫一個 觸發器(Trigger) 示例觀察一下:

timestamp user clicks
1525099013 Mary 1
1525099026 Bob 1
1525099035 Mary 2
1525099047 Llz 1
1525099056 Bob 2
1525099065 Mary 3
// INSERT 的時候查詢一下資料flink_tab_ts,將結果寫到trigger.sql中
 DELIMITER ;;
create trigger flink_tab_ts_trigger_insert after insert
on flink_tab_ts for each row
  begin
       select ts, user, clicks from flink_tab_ts into OUTFILE '/Users/jincheng.sunjc/testdir/atas/trigger.sql';
  end ;;
DELIMITER ;

上面的trigger要將查詢結果寫入本地檔案,預設MySQL是不允許寫入的,我們檢視一下:

MySQL> show variables like '%secure%';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| require_secure_transport | OFF   |
| secure_file_priv         | NULL  |
+--------------------------+-------+
2 rows in set (0.00 sec)

上面secure_file_priv屬性為NULL,說明MySQL不允許寫入file,我需要修改my.cnf在新增secure_file_priv=''開啟寫檔案限制;

MySQL> show variables like '%secure%';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| require_secure_transport | OFF   |
| secure_file_priv         |       |
+--------------------------+-------+
2 rows in set (0.00 sec)

下面我們對flink_tab_ts進行INSERT操作:
image

我們再來看看6次trigger 查詢計算的結果:

image

大家到這裡發現我寫了Trigger的儲存過程之後,每次在資料表flink_tab_ts進行DML操作的時候,Trigger就會觸發一次查詢計算,產出一份新的計算結果,觀察上面的查詢結果發現,結果表不停的增加(Append only)。

有PK的Update場景

我們利用flink_tab_ts的6次DML操作和自定義的觸發器TriggerL來介紹了什麼是持續查詢,做處理靜態查詢與持續查詢的關係。那麼上面的演示目的是為了說明持續查詢,所有操作都是insert,沒有基於主鍵的更新,也就是說Trigger產生的結果都是append only的,那麼大家想一想,如果我們操作flink_tab這張表,按主鍵user進行插入和更新操作,同樣利用Trigger機制來進行持續查詢,結果是怎樣的的呢? 初始化表,trigger:

drop table flink_tab;
create table flink_tab(
    user VARCHAR(100) NOT NULL,
    clicks INT NOT NULL,
    PRIMARY KEY (user)
 );

 DELIMITER ;;
create trigger flink_tab_trigger_insert after insert
on flink_tab for each row
  begin
       select user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql';
  end ;;
DELIMITER ;

DELIMITER ;;
create trigger flink_tab_trigger_ after update
on flink_tab for each row
  begin
        select ts, user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql';
  end ;;
DELIMITER ;

 同樣我做如下6次DML操作,Trigger 6次查詢計算:
image

在來看看這次的結果與append only 有什麼不同?

image

我想大家早就知道這結果了,資料庫裡面定義的PK所有變化會按PK更新,那麼觸發的6次計算中也會得到更新後的結果,這應該不難理解,查詢結果也是不斷更新的(Update)!

關係定義 

上面Append Only 和 Update兩種場景在MySQL上面都可以利用Trigger機制模擬 持續查詢的概念,也就是說資料表中每次資料變化,我們都觸發一次相同的查詢計算(只是計算時候資料的集合發生了變化),因為資料表不斷的變化,這個表就可以看做是一個動態表Dynamic Table,而查詢SQL(select * from flink_tab_ts) 被觸發器Trigger在滿足某種條件後不停的觸發計算,進而也不斷地產生新的結果。這種作用在Dynamic Table,並且有某種機制(Trigger)不斷的觸發計算的查詢我們就稱之為 持續查詢。

那麼到底靜態查詢和動態查詢的關係是什麼呢?在語義上 持續查詢 中的每一次查詢計算的觸發都是一次靜態查詢(相對於當時查詢的時間點),  在實現上 Apache Flink會利用上一次查詢結果+當前記錄 以增量的方式完成查詢計算。

特別說明: 上面我們利用 資料變化+Trigger方式描述了持續查詢的概念,這裡有必要特別強調一下的是資料庫中trigger機制觸發的查詢,每次都是一個全量查詢,這與Apache Flink上面流計算的持續查詢概念相同,但實現機制完全不同,Apache Flink上面的持續查詢內部實現是增量處理的,隨著時間的推移,每條資料的到來實時處理當前的那一條記錄,不會處理曾經來過的歷史記錄!

Apache Flink 如何做到持續查詢

動態表上面持續查詢

在 《Apache Flink 漫談系列 - 流表對偶(duality)性》 中我們瞭解到流和表可以相互轉換,在Apache Flink流計算中攜帶流事件的Schema,經過運算元計算之後再產生具有新的Schema的事件,流入下游節點,在產生新的Schema的Event和不斷流轉的過程就是持續查詢作用的結果,如下圖:
image

增量計算

我們進行查詢大多數場景是進行資料聚合,比如查詢SQL中利用count,sum等aggregate function進行聚合統計,那麼流上的資料來源源不斷的流入,我們既不能等所有事件流入結束(永遠不會結束)再計算,也不會每次來一條事件就像傳統資料庫一樣將全部事件集合重新整體計算一次,在持續查詢的計算過程中,Apache Flink採用增量計算的方式,也就是每次計算都會將計算結果儲存到state中,下一條事件到來的時候利用上次計算的結果和當前的事件進行聚合計算,比如 有一個訂單表,如下:

image

一個簡單的計數和求和查詢SQL:

// 求訂單總數和所有訂單的總金額
select count(id) as cnt,sum(amount)as sumAmount from order_tab;

這樣一個簡單的持續查詢計算,Apache Flink內部是如何處理的呢?如下圖:
image

如上圖,Apache Flink中每來一條事件,就進行一次計算,並且每次計算後結果會儲存到state中,供下一條事件到來時候進行計算,即:

result(n) = calculation(result(n-1), n)。

無PK的Append Only 場景 

在實際的業務場景中,我們只需要進行簡單的資料統計,然後就將統計結果寫入到業務的資料儲存系統裡面,比如上面統計訂單數量和總金額的場景,訂單表本身是一個append only的資料來源(假設沒有更新,截止到2018.5.14日,Apache Flink內部支援的資料來源都是append only的),在持續查詢過程中經過count(id),sum(amount)統計計算之後產生的動態表也是append only的,種場景Apache Flink內部只需要進行aggregate function的聚合統計計算就可以,如下:

image

有PK的Update 場景

現在我們將上面的訂單場景稍微變化一下,在資料表上面我們將金額欄位amount,變為地區欄位region,資料如下:

image

查詢統計的變為,在計算具有相同訂單數量的地區數量;查詢SQL如下:

 CREATE TABLE order_tab(
   id BIGINT,
   region VARCHAR
 ) 

CREATE TABLE region_count_sink(
   order_cnt BIGINT, 
   region_cnt BIGINT,
   PRIMARY KEY(order_cnt) -- 主鍵
) 

-- 按地區分組計算每個地區的訂單數量
CREATE VIEW order_count_view AS
    SELECT
        region, count(id) AS order_cnt
    FROM  order_tab 
    GROUP BY region;

-- 按訂單數量分組統計具有相同訂單數量的地區數量
INSERT INTO region_count_sink 
    SELECT 
        order_cnt,
        count(region) as region_cnt
    FROM order_count_view 
    GROUP BY order_cnt;

上面查詢SQL的程式碼結構如下(這個圖示在Alibaba StreamCompute的整合IDE環境生成的,瞭解更多):
image

上面SQL中我們發現有兩層查詢計算邏輯,第一個查詢計算邏輯是與SOURCE相連的按地區統計訂單數量的分組統計,第二個查詢計算邏輯是在第一個查詢產出的動態表上面進行按訂單數量統計地區數量的分組統計,我們一層一層分析。

錯誤處理

  • 第一層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region; 
    image
  • 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;
    image

按照第一層分析的結果,再分析第二層產出的結果,我們分析的過程是對的,但是最終寫到sink表的計算結果是錯誤的,那我們錯在哪裡了呢?

其實當 (SH,2)這條記錄來的時候,以前來過的(SH, 1)已經是髒資料了,當(BJ, 2)來的時候,已經參與過計算的(BJ, 1)也變成髒資料了,同樣當(BJ, 3)來的時候,(BJ, 2)也是髒資料了,上面的分析,沒有處理髒資料進而導致最終結果的錯誤。那麼Apache Flink內部是如何正確處理的呢?

正確處理

  • 第一層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
    image
  • 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;
    image

上面我們將有更新的事件進行打標的方式來處理髒資料,這樣在Apache Flink內部計算的時候 運算元會根據事件的打標來處理事件,在aggregate function中有兩個對應的方法(retract和accumulate)來處理不同標識的事件,如上面用到的count AGG,內部實現如下:

def accumulate(acc: CountAccumulator): Unit = {
    acc.f0 += 1L // acc.f0 儲存記數
}

def retract(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 -= 1L //acc.f0 儲存記數
    }
}

Apache Flink內部這種為事件進行打標的機制叫做 retraction。retraction機制保障了在流上已經流轉到下游的髒資料需要被撤回問題,進而保障了持續查詢的正確語義。

Apache Flink Connector 型別

本篇一開始就對比了MySQL的資料儲存和Apache Flink資料儲存的區別,Apache Flink目前是一個計算平臺,將資料的儲存以高度抽象的外掛機制與各種已有的資料儲存無縫對接。目前Apache Flink中將資料外掛稱之為連結器Connector,Connnector又按資料的讀和寫分成Soruce(讀)和Sink(寫)兩種型別。對於傳統資料庫表,PK是一個很重要的屬性,在頻繁的按某些欄位(PK)進行更新的場景,在表上定義PK非常重要。那麼作為完全支援ANSI-SQL的Apache Flink平臺在Connector上面是否也支援PK的定義呢?

Apache Flink Source

現在(2018.11.5)Apache Flink中用於資料流驅動的Source Connector上面無法定義PK,這樣在某些業務場景下會造成資料量較大,造成計算資源不必要的浪費,甚至有聚合結果不是使用者“期望”的情況。我們以雙流JOIN為例來說明:

SQL:

CREATE TABLE inventory_tab(
   product_id VARCHAR,
   product_count BIGINT
); 

CREATE TABLE sales_tab(
   product_id VARCHAR,
   sales_count BIGINT
  ) ;

CREATE TABLE join_sink(
   product_id VARCHAR, 
   product_count BIGINT,
   sales_count BIGINT,
   PRIMARY KEY(product_id)
);

CREATE VIEW join_view AS
    SELECT
        l.product_id, 
        l.product_count,
        r.sales_count
    FROM inventory_tab l 
        JOIN  sales_tab r 
        ON l.product_id = r.product_id;

INSERT INTO join_sink 
  SELECT 
      product_id, 
      product_count,
      sales_count
  FROM join_view ;
  

程式碼結構圖:

image

實現示意圖:

image

上圖描述了一個雙流JOIN的場景,雙流JOIN的底層實現會將左(L)右(R)兩面的資料都持久化到Apache Flink的State中,當L流入一條事件,首先會持久化到LState,然後在和RState中儲存的R中所有事件進行條件匹配,這樣的邏輯如果R流product_id為P001的產品銷售記錄已經流入4條,L流的(P001, 48) 流入的時候會匹配4條事件流入下游(join_sink)。

問題

上面雙流JOIN的場景,我們發現其實inventory和sales表是有業務的PK的,也就是兩張表上面的product_id是唯一的,但是由於我們在Sorure上面無法定義PK欄位,表上面所有的資料都會以append only的方式從source流入到下游計算節點JOIN,這樣就導致了JOIN內部所有product_id相同的記錄都會被匹配流入下游,上面的例子是 (P001, 48) 來到的時候,就向下遊流入了4條記錄,不難想象每個product_id相同的記錄都會與歷史上所有事件進行匹配,進而操作下游資料壓力。

那麼這樣的壓力是必要的嗎?從業務的角度看,不是必要的,因為對於product_id相同的記錄,我們只需要對左右兩邊最新的記錄進行JOIN匹配就可以了。比如(P001, 48)到來了,業務上面只需要右流的(P001, 22)匹配就好,流入下游一條事件(P001, 48, 22)。 那麼目前在Apache Flink上面如何做到這樣的優化呢?

解決方案

上面的問題根本上我們要構建一張有PK的動態表,這樣按照業務PK進行更新處理,我們可以在Source後面新增group by 操作生產一張有PK的動態表。如下:(如下DDL和LAST_VALUE 是Alibaba內部對Flink的加強,目前還沒有推回社群,大家慢慢等待,大家想體驗可以登入阿里雲流計算平臺,詳見

SQL:

CREATE TABLE inventory_tab(
   product_id VARCHAR,
   product_count BIGINT
  ) 

 CREATE TABLE sales_tab(
   product_id VARCHAR,
   sales_count BIGINT
  )
CREATE VIEW inventory_view AS
    SELECT 
    product_id,
    LAST_VALUE(product_count) AS product_count
    FROM inventory_tab
    GROUP BY product_id;

CREATE VIEW sales_view AS
    SELECT 
    product_id,
    LAST_VALUE(sales_count) AS sales_count
    FROM sales_tab
    GROUP BY product_id;

CREATE TABLE join_sink(
   product_id VARCHAR, 
   product_count BIGINT,
   sales_count BIGINT,
   PRIMARY KEY(product_id)
)WITH (
    type = 'print'
) ;

CREATE VIEW join_view AS
    SELECT
        l.product_id, 
        l.product_count,
        r.sales_count
    FROM inventory_view l 
        JOIN  sales_view r 
        ON l.product_id = r.product_id;

 INSERT INTO join_sink 
  SELECT 
      product_id, 
      product_count,
      sales_count
  FROM join_view ;

程式碼結構:

image

實現示意圖:
image

如上方式可以將無PK的source經過一次節點變成有PK的動態表,以Apache Flink的retract機制和業務要素解決資料瓶頸,減少計算資源的消耗。

說明1: 上面方案LAST_VALUE是Alibaba內部對Flink的增強功能,社群還沒有支援。

Apache Flink Sink

在Apache Flink上面可以根據實際外部儲存的特點(是否支援PK),以及整體job的執行plan來動態推導Sink的執行模式,具體有如下三種類型:

  • Append 模式 - 該模式使用者在定義Sink的DDL時候不定義PK,在Apache Flink內部生成的所有隻有INSERT語句;
  • Upsert 模式 - 該模式使用者在定義Sink的DDL時候可以定義PK,在Apache Flink內部會根據事件打標(retract機制)生成INSERT/UPDATE和DELETE 語句,其中如果定義了PK, UPDATE語句按PK進行更新,如果沒有定義PK UPDATE會按整行更新;
  • Retract 模式 - 該模式下會產生INSERT和DELETE兩種資訊,Sink Connector 根據這兩種資訊構造對應的資料操作指令;

小結

本篇以MySQL為例介紹了傳統資料庫的靜態查詢和利用MySQL的Trigger+DML操作來模擬持續查詢,並介紹了Apache Flink上面利用增量模式完成持續查詢,並以雙流JOIN為例說明了持續查詢可能會遇到的問題,並且介紹Apache Flink以為事件打標產生delete事件的方式解決持續查詢的問題,進而保證語義的正確性,完美的在流計算上支援續查詢。