kylin cube構建詳解
在使用Kylin的時候,最重要的一步就是建立cube的模型定義,即指定度量和維度以及一些附加資訊,然後對cube進行build,當然我們也可以根據原始表中的某一個string欄位(這個欄位的格式必須是日期格式,表示日期的含義)設定分割槽欄位,這樣一個cube就可以進行多次build,每一次的build會生成一個segment,每一個segment對應著一個時間區間的cube,這些segment的時間區間是連續並且不重合的,對於擁有多個segment的cube可以執行merge,相當於將一個時間區間內部的segment合併成一個。下面從原始碼開始分析cube的build和merge過程。本文基於Kylin-1.0-incubating版本,對於Kylin的介紹可以參見:
入口介紹
在kylin的web頁面上建立完成一個cube之後可以點選action下拉框執行build或者merge操作,這兩個操作都會呼叫cube的rebuild介面,呼叫的引數包括:1、cube名,用於唯一標識一個cube,在當前的kylin版本中cube名是全域性唯一的,而不是每一個project下唯一的;2、本次構建的startTime和endTime,這兩個時間區間標識本次構建的segment的資料來源只選擇這個時間範圍內的資料;對於BUILD操作而言,startTime是不需要的,因為它總是會選擇最後一個segment的結束時間作為當前segment的起始時間。3、buildType標識著操作的型別,可以是”BUILD”、”MERGE”和”REFRESH”。
這些操作的統一入口就是JobService.submitJob函式,該函式首先取出該cube所有關聯的構建cube的job,並且判斷這些job是否有處於READY、RUNNING、ERROR狀態,如果處於該狀態意味著這個job正在執行或者可以之後被resume執行,做這種限制的原因不得而知(可能是構建的區間是基於時間吧,需要對一個cube並行的構建多個segment(時間區間的資料)的需求並不明顯)。所以如果希望build或者merge cube,必須將未完成的cube的操作執行discard操作。然後根據操作型別執行具體的操作:
1. 如果是BUILD,如果這個cube中包含distinct count聚合方式的度量並且這個cube中已經存在其他segment,則執行appendAndMergeSegments函式,否則執行buildJob函式。
2. 如果是MERGE操作則執行mergeSegments函式。
3. 如果是REFRESH,則同樣執行buildJob函式。為這個時間區間的segment重新構建。
buildJob函式構建一個新的segment,mergeSegments函式合併一個時間區間內的所有segments,appendAndMergeSegments函式則首先根據最後一個segment的時間區間的end值build一個新的segment然後再將所有的時間區間的segments進行合併(為什麼包含distinct count的聚合函式的cube的構建一定要進行合併呢?這應該是有distinct-count使用的hyperloglog
BUILD操作
Build操作是構建一個cube指定時間區間的資料,由於kylin基於預計算的方式提供資料查詢,構建操作是指將原始資料(儲存在Hadoop中,通過Hive獲取)轉換成目標資料(儲存在Hbase中)的過程。主要的步驟可以按照順序分為四個階段:1、根據使用者的cube資訊計算出多個cuboid檔案,2、根據cuboid檔案生成htable,3、更新cube資訊,4、回收臨時檔案。每一個階段操作的輸入都需要依賴於上一步的輸出,所以這些操作全是順序執行的。
1. 計算cuboid檔案
在kylin的CUBE模型中,每一個cube是由多個cuboid組成的,理論上有N個普通維度的cube可以是由2的N次方個cuboid組成的,那麼我們可以計算出最底層的cuboid,也就是包含全部維度的cuboid(相當於執行一個group by全部維度列的查詢),然後在根據最底層的cuboid一層一層的向上計算,直到計算出最頂層的cuboid(相當於執行了一個不帶group by的查詢),其實這個階段kylin的執行原理就是這個樣子的,不過它需要將這些抽象成mapreduce模型,提交mapreduce作業執行。
1.1 生成原始資料(Create Intermediate Flat Hive Table)
這一步的操作是根據cube的定義生成原始資料,這裡會新建立一個hive外部表,然後再根據cube中定義的星狀模型,查詢出維度(對於DERIVED型別的維度使用的是外來鍵列)和度量的值插入到新建立的表中,這個表是一個外部表,表的資料檔案(儲存在HDFS)作為下一個子任務的輸入,它首先根據維度中的列和度量中作為引數的列得到需要出現在該表中的列,然後執行三步hive操作,這三步hive操作是通過hive -e的方式執行的shell命令。 1. drop TABLE IF EXISTS xxx. 2. CREATE EXTERNAL TABLE IF NOT EXISTS xxx() ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\177’ STORED AS SEQUENCEFILE LOCATION xxxx,其中表名是根據當前的cube名和segment的uuid生成的,location是當前job的臨時檔案,只有當insert插入資料的時候才會建立,注意這裡每一行的分隔符指定的是’\177’(目前是寫死的,十進位制為127). 3. 插入資料,在執行之前需要首先設定一些配置項,這些配置項通過hive的SET命令設定,是根據這個cube的job的配置檔案(一般是在kylin的conf目錄下)設定的,最後執行的是INSERT OVERWRITE TABLE xxx SELECT xxxx語句,SELECT子句中選出cube星狀模型中事實表與維度表按照設定的方式join之後的出現在維度或者度量引數中的列(特殊處理derived列),然後再加上使用者設定的where條件和partition的時間條件(根據輸入build的引數). 需要注意的是這裡無論使用者設定了多少維度和度量,每次join都會使用事實表和所有的維度表進行join,這可能造成不必要的效能損失(多一個join會影響hive效能,畢竟要多讀一些檔案)。這一步執行完成之後location指定的目錄下就有了原始資料的檔案,為接下來的任務提供了輸入。
1.2 建立事實表distinct column檔案(Extract Fact Table Distinct Columns)
在這一步是根據上一步生成的hive表計算出還表中的每一個出現在事實表中的度量的distinct值,並寫入到檔案中,它是啟動一個MR任務完成的,MR任務的輸入是HCatInputFormat,它關聯的表就是上一步建立的臨時表,這個MR任務的map階段首先在setup函式中得到所有度量中出現在事實表的度量在臨時表的index,根據每一個index得到該列在臨時表中在每一行的值value,然後將<index, value>
作為mapper的輸出,該任務還啟動了一個combiner,它所做的只是對同一個key的值進行去重(同一個mapper的結果),reducer所做的事情也是進行去重(所有mapper的結果),然後將每一個index對應的值一行行的寫入到以列名命名的檔案中。如果某一個維度列的distinct值比較大,那麼可能導致MR任務執行過程中的OOM。
對於這一步我有一個疑問就是既然所有的原始資料都已經通過第一步存入到臨時hive表中了,我覺得接下來就不用再區分維度表和事實表了,所有的任務都基於這個臨時表,那麼這一步就可以根據臨時表計算出所有的維度列的distinct column值,但是這裡僅僅針對出現在事實表上的維度,不知道這樣做的原因是什麼?難道是因為在下一步會單獨計算維度表的dictionary以及snapshot?
1.3 建立維度詞典(Build Dimension Dictionary)
這一步是根據上一步生成的distinct column檔案和維度表計算出所有維度的詞典資訊,詞典是為了節約儲存而設計的,用於將一個成員值編碼成一個整數型別並且可以通過整數值獲取到原始成員值,每一個cuboid的成員是一個key-value形式儲存在hbase中,key是維度成員的組合,但是一般情況下維度是一些字串之類的值(例如商品名),所以可以通過將每一個維度值轉換成唯一整數而減少記憶體佔用,在從hbase查找出對應的key之後再根據詞典獲取真正的成員值。 這一步是在kylin程序內的一個執行緒中執行的,它會建立所有維度的dictionary,如果是事實表上的維度則可以從上一步生成的檔案中讀取該列的distinct成員值(FileTable),否則則需要從原始的hive表中讀取每一列的資訊(HiveTable),根據不同的源(檔案或者hive表)獲取所有的列去重之後的成員列表,然後根據這個列表生成dictionary,kylin中針對不同型別的列使用不同的實現方式,對於time之類的(date、time、dtaetime和timestamp)使用DateStrDictionary,這裡目前還存在著一定的問題,因為這種編碼方式會首先將時間轉換成‘yyyy-MM-dd’的格式,會導致timestamp之類的精確時間失去天以後的精度。針對數值型的使用NumberDictionary,其餘的都使用一般的TrieDictionary(字典樹)。這些dictionary會作為cube的元資料儲存的kylin元資料庫裡面,執行query的時候進行轉換。 之後還需要計算維度表的snapshotTable,每一個snapshot是和一個hive維度表對應的,生成的過程是:首先從原始的hive維度表中順序得讀取每一行每一列的值,然後使用TrieDictionary方式對這些所有的值進行編碼,這樣每一行每一列的之都能夠得到一個編碼之後的id(相同的值id也相同),然後再次讀取原始表中每一行的值,將每一列的值使用編碼之後的id進行替換,得到了一個只有id的新表,這樣同時儲存這個新表和dictionary物件(id和值得對映關係)就能夠儲存整個維度表了,同樣,kylin也會將這個資料儲存元資料庫中。 針對這一步需要注意的問題:首先,這一步的兩個步驟都是在kylin程序的一個執行緒中執行的,第一步會載入某一個維度的所有distinct成員到記憶體,如果某一個維度的cardinality比較大 ,可能會導致記憶體出現OOM,然後在建立snapshotTable的時候會限制原始表的大小不能超過配置的一個上限值,如果超過則會執行失敗。但是應該強調的是這裡載入全部的原始維度表更可能出現OOM。另外,比較疑惑的是:1、為什麼不在上一步的MR任務中直接根據臨時表中的資料生成每一個distinct column值,而是從原始維度表中讀取?2、計算全表的dictionary是為了做什麼?我目前只瞭解對於drived維度是必要儲存主鍵和列之間的對映,但是需要儲存整個維度表?!
1.4 計算生成BaseCuboid檔案(Build Base Cuboid Data)
何謂Base cuboid呢?假設一個cube包含了四個維度:A/B/C/D,那麼這四個維度成員間的所有可能的組合就是base cuboid,這就類似在查詢的時候指定了select count(1) from xxx group by A,B,C,D;這個查詢結果的個數就是base cuboid集合的成員數。這一步也是通過一個MR任務完成的,輸入是臨時表的路徑和分隔符,map對於每一行首先進行split,然後獲取每一個維度列的值組合作為rowKey,但是rowKey並不是簡單的這些維度成員的內容組合,而是首先將這些內容從dictionary中查找出對應的id,然後組合這些id得到rowKey,這樣可以大大縮短hbase的儲存空間,提升查詢效能。然後在查詢該行中的度量列,根據cube定義中度量的函式返回對該列計算之後的值。這個MR任務還會執行combiner過程,執行邏輯和reducer相同,在reducer中的key是一個rowKey,value是相同的rowKey的measure組合的陣列,reducer回分解出每一個measure的值,然後再根據定義該度量使用的聚合函式計算得到這個rowKey的結果,其實這已經類似於hbase儲存的格式了。
1.5 計算第N層cuboid檔案(Build N-Dimension Cuboid Data)
這一個流程是由多個步驟的,它是根據維度組合的cuboid的總數決定的,上一層cuboid執行MR任務的輸入是下一層cuboid計算的輸出,由於最底層的cuboid(base)已經計算完成,所以這幾步不需要依賴於任何的hive資訊,它的reducer和base cuboid的reducer過程基本一樣的(相同rowkey的measure執行聚合運算),mapper的過程只需要根據這一行輸入的key(例如A、B、C、D中某四個成員的組合)獲取可能的下一層的的組合(例如只有A、B、C和B、C、D),那麼只需要將這些可能的組合提取出來作為新的key,value不變進行輸出就可以了。 舉個例子,假設一共四個維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個measure(對於這列V,計算sum(V)),這裡忽略dictionary編碼。原始表如下:
A | B | C | D | V |
---|---|---|---|---|
A1 | B1 | C1 | D1 | 2 |
A1 | B2 | C1 | D1 | 3 |
A2 | B1 | C1 | D1 | 5 |
A3 | B1 | C1 | D1 | 6 |
A3 | B2 | C1 | D1 | 8 |
那麼base cuboid最終的輸出如下
(<A1、B1、C1、D1>
、2)
(<A1、B2、C1、D1>
, 3)
(<A2、B1、C1、D1>
, 5)
(<A3、B1、C1、D1>
, 6)
(<A3、B2、C1、D1>
, 8)
那麼它作為下面一個cuboid的輸入,對於第一行輸入
(<A1、B1、C1、D1>
, 2),mapper執行完成之後會輸出
(<A1、B1、C1>
, 2)、
(<A1、B1、D1>
, 2)、
(<A1、C1、D1>
, 2)、
(<B1、C1、D1>
, 2)這四項,同樣對於其他的內一行也會輸出四行,最終他們經過reducer的聚合運算,得到如下的結果:
(<A1、B1、C1>
, 2)
(<A1、B1、D1>
, 2)
(<A1、C1、D1>
, 2 + 3)
(<B1、C1、D1>
,2 + 5 +6)
...
這樣一次將下一層的結果作為輸入計算上一層的cuboid成員,直到最頂層的cuboid,這一個層cuboid只包含一個成員,不按照任何維度進行group by。
上面的這些步驟用於生成cuboid,假設有N個維度(對於特殊型別的),那麼就需要有N +1層cuboid,每一層cuboid可能是由多個維度的組合,但是它包含的維度個數相同。
2 準備輸出
在上面幾步中,我們已經將每一層的cuboid計算完成,每一層的cuboid檔案都是一些cuboid的集合,每一層的cuboid的key包含相同的維度個數,下面一步就是將這些cuboid檔案匯入到hbase中。
2.1 計算分組
這一步的輸入是之前計算的全部的cuboid檔案,按照cuboid檔案的順序(層次的順序)一次讀取每一個key-value,再按照key-value的形式統計每一個key和value佔用的空間大小,然後以GB為單位,mapper階段的輸出是每當統計到1GB的資料,將當前的這個key和當前資料量總和輸出,在reducer階段根據使用者建立cube時指定的cube大小(SMALL,MEDIUM和LARGE)和總的大小計算出實際需要劃分為多少分割槽,這時還需要參考最多分割槽數和最少分割槽數進行計算,再根據實際資料量大小和分割槽數計算出每一個分割槽的邊界key,將這個key和對應的分割槽編號輸出到最終檔案中,為下一步建立htable做準備。
2.2 建立HTable
這一步非常簡單,根據上一步計算出的rowKey分佈情況(split陣列)建立HTable,建立一個HTable的時候還需要考慮一下幾個事情:1、列組的設定,2、每一個列組的壓縮方式,3、部署coprocessor,4、HTable中每一個region的大小。在這一步中,列組的設定是根據使用者建立cube時候設定的,在hbase中儲存的資料key是維度成員的組合,value是對應聚合函式的結果,列組針對的是value的,一般情況下在建立cube的時候只會設定一個列組,該列包含所有的聚合函式的結果;在建立HTable時預設使用LZO壓縮,如果不支援LZO則不進行壓縮,在後面kylin的版本中支援更多的壓縮方式;kylin強依賴於hbase的coprocessor,所以需要在建立HTable為該表部署coprocessor,這個檔案會首先上傳到HBase所在的HDFS上,然後在表的元資訊中關聯,這一步很容易出現錯誤,例如coprocessor找不到了就會導致整個regionServer無法啟動,所以需要特別小心;region的劃分已經在上一步確定了,所以這裡不存在動態擴充套件的情況,所以kylin建立HTable使用的介面如下: public void createTable( final HTableDescriptor desc , byte [][] splitKeys)
2.3 構建hfile檔案
建立完了HTable之後一般會通過插入介面將資料插入到表中,但是由於cuboid中的資料量巨大,頻繁的插入會對Hbase的效能有非常大的影響,所以kylin採取了首先將cuboid檔案轉換成HTable格式的Hfile檔案,然後在通過bulkLoad的方式將檔案和HTable進行關聯,這樣可以大大降低Hbase的負載,這個過程通過一個MR任務完成。 這個任務的輸入是所有的cuboid檔案,在mapper階段根據每一個cuboid成員的key-value輸出,如果cube定義時指定了多個列組,那麼同一個key要按照不同列組中的值分別輸出,例如在cuboid檔案中存在一行cuboid=1,key=1,value=sum(cost),count(1)的資料,而cube中將這兩個度量劃分到兩個列組中,這時候對於這一行資料,mapper的輸出為<1, sum(cost)>和<1,count(1)>。reducer使用的是org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer,它會按照行排序輸出,如果一行中包含多個值,那麼會將這些值進行排序再輸出。輸出的格式則是根據HTable的檔案格式定義的。
2.4 BulkLoad檔案
這一步將HFile檔案load到HTable中,因為load操作會將原始的檔案刪除(相當於remove),在操作之前首先將所有列組的Hfile的許可權都設定為777,然後再啟動LoadIncrementalHFiles任務執行load操作,它的輸入為檔案的路徑和HTable名,這一步完全依賴於HBase的工具。這一步完成之後,資料已經儲存到HBase中了,key的格式由cuboid編號+每一個成員在字典樹的id組成,value可能儲存在多個列組裡,包含在原始資料中按照這幾個成員進行GROUP BY計算出的度量的值。
3 收尾工作
執行完上一步就已經完成了從輸入到輸出的計算過程,接下來要做的就是一些kylin內部的工作,分別是更新元資料,更新cube狀態,垃圾資料回收。
3.1 更新狀態
這一步主要是更新cube的狀態,其中需要更新的包括cube是否可用、以及本次構建的資料統計,包括構建完成的時間,輸入的record數目,輸入資料的大小,儲存到Hbase中資料的大小等,並將這些資訊持久到元資料庫中。
3.2 垃圾檔案回收
這一步是否成功對正確性不會有任何影響,因為經過上一步之後這個segment就可以在這個cube中被查詢到了,但是在整個執行過程中產生了很多的垃圾檔案,其中包括:1、臨時的hive表,2、因為hive表是一個外部表,儲存該表的檔案也需要額外刪除,3、fact distinct 這一步將資料寫入到HDFS上為建立詞典做準備,這時候也可以刪除了,4、rowKey統計的時候會生成一個檔案,此時可以刪除。5、生成HFile時檔案儲存的路徑和hbase真正儲存的路徑不同,雖然load是一個remove操作,但是上層的目錄還是存在的,也需要刪除。這一步kylin做的比較簡單,並沒有完全刪除所有的臨時檔案,其實在整個計算過程中,真正還需要保留的資料只有多個cuboid檔案(需要增量build的cube),這個因為在不同segment進行merge的時候是基於cuboid檔案的,而不是根據HTable的。
在Kylin-1.x版本中,整個cube的一個build的過程大概就是這樣,這樣的一個build只不過是生成一虐segment,而當一個cube中存在多個segment時可能需要將它們進行merge,merge的過程和build的流程大致是相同的,不過它不需要從頭開始,只需要對字典進行merge,然後在對cuboid檔案進行merge,最後生成一個新的HTable。 但是在Kylin-2.x版本中,整個家溝發生了很大的變化,build的引擎也分成了多套,分別是原始的MR引擎,基於Fast Cubing的MR引擎和Spark引擎,這使得build進行的更迅速,大大降低等待時間,後面會持續的再對新的引擎進行分析。
--------------------- 本文來自 奼紫_嫣紅 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/SONGCHUNHONG/article/details/78028951?utm_source=copy