1. 程式人生 > 其它 >實時數倉系統構建

實時數倉系統構建

背景介紹

當前的資料報表服務採用定時離計算的方式構建數倉,但隨著業務對實時性的要求變高,需要實現一套實時入庫方案。

問題分析

對外提供服務的大寬表分基礎表,事實表和統計表三類,事實表和緯度表都包含緯度資訊。在實時流處理過程中,每來一條事實資料,用緯度id查詢緯度資料,將緯度資料和事實資料寫入具體的業務topic中。這裡最核心的步驟是用緯度id查詢緯度資料。因為實時資料的吞吐量比較大,如果每來條一條資料去mysql中查詢一次,mysql扛不抗的住還好說,每次查詢mysql的耗時是不能忍受的。一般的做法都是利用cache快取mysql資料,實時資料查詢緯度資料的時候先從cache中查,如果查不到再從db中查,將查到的資料放入cache,方便下次使用。 上述方案雖然解決了事實和緯度資料join效能的問題,但同樣也帶來了資料不一致的問題。也就是說存在mysql資料已更改,但cache中還是舊資料的情況,那麼新生成的大寬表緯度資料是過期資料。其實不止新生成的大寬表緯度資料是過期資料,之前大寬表中涉及到的緯度資料都是過期資料。

業務庫資料存在mysql中,olap引擎用的是clickhouse。中間資料存在kafka或者mysql。雖然維表資料可以用記憶體資料庫(redis,memcache等)儲存,由於需要引入其他元件,相對比較麻煩,而mysql的效能也不差,所以維表這裡全都是mysql儲存的。中間資料涉及到部分欄位更新的儲存在mysql,全量資料更新的用kafka。

解決方案探索

既然大寬表中存在過期的緯度資訊,不產生錯誤資料或者把錯誤資料糾正一下就可以了。

方案一

採用lamda架構,開發兩套程式,一套實時,一套離線。實時資料join維表時利用快照的方式讀取,保證及時性。離線每隔一段時間執行一次,保證正確性。

方案二

方案一的缺點是需要開發兩套程式,維護起來不方便。有沒有可能用一套程式來實現?kappa架構就是來幹這個的。Kappa架構的核心思想:

1、用Kafka或者類似MQ佇列系統收集各種各樣的資料,需要幾天的資料量就儲存幾天。

2、當需要全量重新計算時,重新起一個流計算例項,從頭開始進行處理,並輸出到一個新的結果儲存中。

3、當新的例項做完後,停止老的流計算例項,並把老的一些結果刪除。

在當前的應用場景,遇到維表資料變更,從頭開始消費資料效率有點低,而且大部分資料是沒有變更的,無需重新消費。我們可以修改下kappa架構,只消費維表的變更資料,將維表的資料更新到大寬表即可。我們對外提供服務的olap引擎,一般只支援全欄位資料覆蓋,不支援部分欄位update。即使支援部分欄位update,當一個維表字段變更涉及到大寬表很多資料時,update的效能會很差,而且還會影響olap引擎對外提供服務的效能。所以一般情況下是根據維表變更資料查出大寬表變更資料,將其寫到訊息佇列中,再消費佇列寫入到大寬表,覆蓋舊資料。假設大寬表join來3張維表的資料,具體流程如下:

1、在一個任務中消費3張維表的topic,將變更資料彙總到一起

2、根據變更的維表資料,從db中查出需要變更的事實資料

3、將需要變更的事實資料,寫到事實資料對應的topic中,複用之前的消費事實資料topic join維表資料的流。

方案三

我們當前遇到的是維表資料變更導致的大寬表資料變更問題。如果大寬表中只存放緯度id,不存放緯度屬性資料,查詢的時候將多張表的資料join一下,返回大寬表資料。

這種方案可以說完美解決了大寬表中緯度更新的問題,而且大寬表不再包含屬性資料,減少了儲存空間。但這種方案對olap引擎的效能要求較高。因為之前是關聯好資料,直接查詢,現在需要查多個數據,再關聯,查詢效能肯定沒之前好。 兩表join的時候,olap引擎一般是將小表放到記憶體中,做mapjoin。如果緯度資料很小,這種方案優勢很大。如果緯度資料較大,需要很好的劃分hash,或者採用其他方案。

目前市面上支援join,且效能不錯的olap引擎,clickhouse和doris都可以。

解決方案

我們的資料需求可以分為三類:

1、mysql的資料原封不動的copy到clickhouse

我們公司內部有一個mysql平臺,可以增量、全量將資料寫到指定的topic,所以這裡我們只有實時方案,消費topic資料寫到clickhouse。如果需要回溯資料,從新全量匯出資料到topic即可。

2、mysql資料join一些緯度,寫到clickhouse

我們這邊的資料量不大,本來是準備用方案三的,但clickhouse join的效能很差,所以放棄方案三。我們這邊的業務場景大都是維表資料很少,事實資料很多,極端情況下會出現一條緯度資料更改,所有大寬表資料都需更改的問題。所以這裡我們選了方案一,實時部分join維錶快照寫資料到clickhouse,保證實時性;離線部分每半小時執行一次,保證正確性。

3、mysql資料做彙總後join緯度,寫到clickhouse

針對於輕度彙總資料,比如說每小時或者每天彙總一次,可以利用flink watermark相關的語法實現,join緯度資料,flink sql支援的也很完善。 針對於重度彙總資料,比如說一個月彙總一次,這類需求通過實時來算不合適(狀態資料過多影響checkpoint;任務重啟恢復比較麻煩),一般都是寫sql離線計算。

一些細節

1、元資料變更問題

任務啟動時從資料地圖讀取表字段資訊,入庫mysql時,按照資料地圖中的欄位資訊入庫到clickhouse。任務執行過程中mysql加了一個欄位,此時由於記憶體中的資料地圖資訊是過期資料,所以還是按照之前的欄位入庫,也就是新欄位沒有入到clickhouse中。 比較笨的方案就是,在資料地圖上面修改欄位資訊後,手動重啟任務,重新載入元資料,可以解決問題。 還可以每隔一段時間讀下資料地圖,更新記憶體中的元資料。 如果嫌每次讀資料地圖代價太大,還可以將資料地圖的變更資訊寫到一個地方,比如說db,定時輪詢db檢測是否有更新,有更新的話就從資料地圖重新載入資料。

2、資料版本迭代問題

假設中間表儲存在kafka,某個時間中間表tableA將欄位A修改為a,那麼訊息佇列中A和a都會存在。但入庫的資料流只會採用一種元資料(要麼是A,要麼是a),所以入庫的資料流不論採用哪種元資料都會報錯。 這種可以通過版本號來解決。 比如說tableA有個版本欄位,當前的版本是1.0,將欄位A修改為a後,版本改為1.1,資料入庫時指定入庫的版本,如果配置為1.1,那麼包含欄位A的資料就會被過濾掉,如果配置為1.0,就會忽略包含欄位a的資料。

3、實時和離線同時更新mysql導致的死鎖問題

如果二者是按照id批量更新mysql資料,問題不大,死鎖發生的概率比較小,可以忍受。 但如果二者是通過唯一索引更新資料,由於唯一索引重複後會加next-key鎖(包含gap鎖),此時發生死鎖的概率非常大。 如果可以從業務上來解決,把按唯一索引更新改為按主鍵更新是最完美的方案。

如果業務上面解決不了,可以採用下面的辦法減少死鎖概率:

1、實時和離線都採用單執行緒寫入db

2、減少單個事務提交的資料量