4 步搞定 Hive 增量更新
Hive 的更新很有趣。
Hive 的表有兩種,一種是 managed table, 一種是 external table.
managed table 是 Hive 自動幫我們維護的表,自動分割底層儲存檔案,自動分割槽,這些自動化的操作,都是 Hive 封裝了與 Hadoop 互動的介面。
external table 只是一種在 Hive 維護的與外部檔案的對映。
managed table 與 external table 最大的區別在於刪除的時候,external table 預設情況下只是刪除表定義,而資料依舊在hadoop 上儲存著;managed table 則是表定義連著表資料一起被刪除了。
早期的時候, Hive 支援的表操作只有兩種:OverWrite 和 Appand
Overwrite 並不是對某一行的資料做更新,而是對整張表做覆蓋,所以感覺上 Hive 更像是在做 ETL 裡面的 Staging, 而不像是最終儲存計算結果的地方。Hive 超強的計算能力可以做為大資料量轉換的工具,最終結果將被送到關係型資料庫或者其他 Hive 例項上儲存。
hortonworks 有一篇提出相關解決方案的文章,介紹了 4步走解決增量更新 Hive 表:
url如下:
Ingest
Reconcile
Compact
Purge
過程中,用到了四個 Hive 表,分別是:
base_table: 初始化裝載源庫來的表資料,表示最新資料
incremental_table:用來裝載上一次增量更新以來,發生過更改的資料,包括新增,更新,和刪除
reconcile_view:以 base_table, incremental_table 計算出來的最新資料,涉及到的操作,有刪除,更新,和新增。每一次都要重複計算是不是有些多餘,浪費很多對沒有變更的資料的重複計算。如果有對資料有分割槽,只要對有資料更新的分割槽做增量更新,會有很大效率的提高。
reporting_table:將reconcile_view的資料,裝載到 reporting_table中,用它來替換掉 base_table中的資料。
一) 取決於源資料庫的服務是否支援直連抽取資料,可以有兩種方法完成第一步 ingest, 即 Extract.
File Processing: 由源資料庫自發的輸出,以檔案方式在合理的時間視窗匯出
RDBMS Processing (Database Client based ETL): 由 Sqoop 來完成抽取; ETL 工具, kettle, Informatica等;
File Processing :
由資料庫軟體自帶的匯入匯出,將檔案匯出一定分隔符分割的文字檔案
將這些文字檔案放到 Hive 對映的資料夾下面
RDBMS Processing (Database Client based ETL):
- SQOOP: 既可以實現初始化匯入,也可以完成增量匯入,增量匯入的實現,依賴於Sqoop 本身的 check-sum 機制。check-sum 是對 Hive 表中的一行用來做校驗資料做了 hash 計算,根據匹配是否來做增量更新。
以下是文章的原文,展示了 Sqoop 的具體用法:
SQOOP is the JDBC-based utility for integrating with traditional
databases.A SQOOP Import allows for the movement of data into either HDFS (a
delimited format can be defined as part of the Import definition) or
directly into a Hive table.The entire source table can be moved into HDFS or Hive using the
“–table” parameter.
sqoop import
--connect jdbc:teradata://{host name or ip address}/Database=retail
--connection-manager org.apache.sqoop.teradata.TeradataConnManager
--username dbc
--password dbc
--table SOURCE_TBL
--target-dir /user/hive/incremental_table -m 1
注**
–table source_TBL: 是指關係型資料庫裡的原表
–target-dir :Hive 中表對應的儲存目錄
After the initial import, subsequent imports can leverage SQOOP’s native support for “Incremental Import” by using the “check-column”, “incremental” and “last-value” parameters.
sqoop import
--connect jdbc:teradata://{host name or ip address}/Database=retail
--connection-manager org.apache.sqoop.teradata.TeradataConnManager
--username dbc
--password dbc
--table SOURCE_TBL
--target-dir /user/hive/incremental_table -m 1
--check-column modified_date
--incremental lastmodified
--last-value {last_import_date|last_import_value}
注**
–check-column : 是指定原表中用來做增量判斷條件的那一欄位
–incremental lastmodified: 指定增量的模式,append 或者 lastmodified.
在資料倉庫中,無論是維度表還是事實表,我們總會設計一欄自增列,作為代理鍵或者主鍵。這個時候這些鍵值總是自增長的,因此適合採用 append 形式,指定check-sum 列為自增列,如果有比 {last_import_value}大的值,就會被 sqoop 匯入進來;
在設計資料庫的時候,為了審計,我們通常也會設計一列為 timestamp 列,每對一行做了修改,就會重置這列 timestamp 為當前時間戳。如果是針對這類行資料,我們要指定的便是 lastmodified, 配合 check-sum 設定為 timestamp 列,sqoop 就會匯入比{last_import_date} 大的資料行。
–last-value { last_import_date } 這是需要從程式外面傳進來的
考慮到這是增量更新,那麼理應把 sqoop 做成一個 Job 來自動化執行,並且記錄每一次的時間,作為下次執行時要傳入的 {last_import_date} 或者{last_import_value}
Alternately, you can leverage the “query” parameter, and have SQL select statements limit the import to new or changed records only.
sqoop import
--connect jdbc:teradata://{host name or ip address}/Database=retail
--connection-manager org.apache.sqoop.teradata.TeradataConnManager
--username dbc
--password dbc
--target-dir /user/hive/incremental_table -m 1
--query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’
Note: For the initial load, substitute “base_table” for “incremental_table”. For all subsequent loads, use “incremental_table”.
注**
這是前面兩種全量和增量的替代寫法,用指定的查詢,從原關係型資料庫匯出資料,不同的是,全量的時候,要指定匯入的 Hive 目標表是 base_table, 而增量的時候,匯入的是 incremental_table.
二) Reconciliation 將新舊資料綜合起來
初始化時,裝載最終的目標表沒有多少難度。
在這段中,主要解決的問題是增量與初始化的融合。
初始化的資料,儲存在 base_table 中, 而增量資料我們已經裝載到了 incremental_table 中。
將兩者的資料合二為一,就可以生成與源資料庫一致的最新資料。
前提是源資料庫的任何資料行不接受硬刪除即delete 操作,而是在行上打了一個軟刪除的標籤,表示該行已刪除。
如果是做了硬刪除,那麼同時也要做好刪除的審計,將刪除的資料行放入審計表中,一同傳送給 incremental_table .
base_table
CREATE TABLE base_table (
id string,
field1 string,
field2 string,
field3 string,
field4 string,
field5 string,
modified_date string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/hive/base_table';
incremental_table
CREATE EXTERNAL TABLE incremental_table (
id string,
field1 string,
field2 string,
field3 string,
field4 string,
field5 string,
modified_date string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/hive/incremental_table';
reconcile_view
CREATE VIEW reconcile_view AS
SELECT t2.id, t2.field1, t2.field2, t2.field3, t2.field4, t2.field5, t2.modified_date FROM
(SELECT *,ROW_NUMBER() OVER (PARTITION BY id ORDER BY modified_date DESC) rn
FROM (SELECT * FROM base_table
UNION ALL
SELECT * FROM incremental_table)
t1) t2
WHERE rn = 1;
從最後一個view定義來解說,incremental_table 必須擁有增量記錄的全部,因此硬刪除操作就不會反應在 incremental_table 裡頭。
但是 reconcile_view 所涉及的量畢竟有限,浪費明明不會更改的那部分資料的計算。
因此如果能做好分割槽,僅僅對某幾個分割槽做全量更新會更高效。
三) Compact: 物化檢視,即將reconciliation_view 裝載到 reporting_table 裡面去
reporting_table
DROP TABLE reporting_table;
CREATE TABLE reporting_table AS
SELECT * FROM reconcile_view;
首先是要將之前的 reporting_table 刪除,再重建 reporting _table, 用 reconciliation_view 填充這張表。
在這張表的基礎上,可以做很多聚合,過濾等操作,進行資料二次加工。
四) Purge: 將多餘的表資料清空
base_table :應該當換成 reporting_table 裡面的資料
incremental_table: 清空
DROP TABLE base_table;
CREATE TABLE base_table AS
SELECT * FROM reporting_table;
hadoop fs –rm –r /user/hive/incremental_table/*
總結:
Oozie 可以將這4步統一做成一個工作流,方便排程
可以用指令碼自定義工作流,就像資料倉庫的 ETL 一樣