1. 程式人生 > 其它 >Apache Hudi 介紹與應用

Apache Hudi 介紹與應用

Apache Hudi

Apache Hudi 在基於 HDFS/S3 資料儲存之上,提供了兩種流原語:

  1. 插入更新
  2. 增量拉取

一般來說,我們會將大量資料儲存到HDFS/S3,新資料增量寫入,而舊資料鮮有改動,特別是在經過資料清洗,放入資料倉庫的場景。而且在資料倉庫如 hive中,對於update的支援非常有限,計算昂貴。另一方面,若是有僅對某段時間內新增資料進行分析的場景,則hive、presto、hbase等也未提供原生方式,而是需要根據時間戳進行過濾分析。

在此需求下,Hudi可以提供這兩種需求的實現。第一個是對record級別的更新,另一個是僅對增量資料的查詢。且Hudi提供了對Hive、presto、Spark的支援,可以直接使用這些元件對Hudi管理的資料進行查詢。

儲存型別

我們看一下 Hudi 的兩種儲存型別:

  1. 寫時複製(copy on write):僅使用列式檔案(parquet)儲存資料。在寫入/更新資料時,直接同步合併原檔案,生成新版本的基檔案(需要重寫整個列資料檔案,即使只有一個位元組的新資料被提交)。此儲存型別下,寫入資料非常昂貴,而讀取的成本沒有增加,所以適合頻繁讀的工作負載,因為資料集的最新版本在列式檔案中始終可用,以進行高效的查詢。
  2. 讀時合併(merge on read):使用列式(parquet)與行式(avro)檔案組合,進行資料儲存。在更新記錄時,更新到增量檔案中(avro),然後進行非同步(或同步)的compaction,建立列式檔案(parquet)的新版本。此儲存型別適合頻繁寫的工作負載,因為新記錄是以appending 的模式寫入增量檔案中。但是在讀取資料集時,需要將增量檔案與舊檔案進行合併,生成列式檔案。

檢視

在瞭解這兩種儲存型別後,我們再看一下Hudi支援的儲存資料的檢視(也就是查詢模式):

  1. 讀優化檢視(Read Optimized view):直接query 基檔案(資料集的最新快照),也就是列式檔案(如parquet)。相較於非Hudi列式資料集,有相同的列式查詢效能
  2. 增量檢視(Incremental View):僅query新寫入資料集的檔案,也就是指定一個commit/compaction,query此之後的新資料。
  3. 實時檢視(Real-time View):query最新基檔案與增量檔案。此檢視通過將最新的基檔案(parquet)與增量檔案(avro)進行動態合併,然後進行query。可以提供近實時的資料(會有幾分鐘的延遲)

在以上3種檢視中,“讀優化檢視”與“增量檢視”均可在“寫時複製”與“讀時合併”的儲存型別下使用。而“實時檢視“僅能在”讀時合併“模式下使用。

儲存型別

支援的檢視

寫時複製

讀優化 + 增量

讀時合併

讀優化 + 增量 + 近實時

時間軸

最後介紹一下 Hudi 的核心 —— 時間軸。Hudi 會維護一個時間軸,在每次執行操作時(如寫入、刪除、合併等),均會帶有一個時間戳。通過時間軸,可以實現在僅查詢某個時間點之後成功提交的資料,或是僅查詢某個時間點之前的資料。這樣可以避免掃描更大的時間範圍,並非常高效地只消費更改過的檔案(例如在某個時間點提交了更改操作後,僅query某個時間點之前的資料,則仍可以query修改前的資料)。

使用案例

下面我們嘗試使用Hudi API 進行讀寫。

寫入資料

首先準備資料集,部分條目為:

1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794

875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7

1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10

26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681

1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605

啟動spark-shell,並指定hudi jar包:

spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

載入指定包:

1 2 3 4 5 6 importscala.collection.JavaConversions._ importorg.apache.spark.sql.SaveMode._ importorg.apache.hudi.DataSourceReadOptions._ importorg.apache.hudi.DataSourceWriteOptions._ importorg.apache.hudi.config.HoodieWriteConfig._ importorg.apache.hudi.hive.MultiPartKeysValueExtractor

指定建立的Hudi表名與路徑:

1 2 valtableName="hudi_table" valbasePath="s3://xxxx/xxx"

構造 DataFrame:

1 2 3 4 vallineRDD=sc.textFile("features.txt").map(_.split("\\|")).filter(_.length >6) caseclassRecord(id:Int, name:String, c_class:String, state:String, latitude:Float, longitude:String, elevation:Int) valRecordRDD=lineRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt)) valfeatureDF=RecordRDD.toDF

插入資料到 Hudi(以及Hive):

1 2 3 4 5 6 7 8 9 10 11 featureDF.write.format("org.apache.hudi"). option(RECORDKEY_FIELD_OPT_KEY,"c_class"). option(PARTITIONPATH_FIELD_OPT_KEY,"state"). option(PRECOMBINE_FIELD_OPT_KEY,"id"). option(TABLE_NAME, tableName). option(HIVE_SYNC_ENABLED_OPT_KEY,"true"). option(HIVE_TABLE_OPT_KEY,"hivehudi"). option(HIVE_PARTITION_FIELDS_OPT_KEY,"state"). option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName). mode(Overwrite). save(basePath);

我們可以看到目錄結構類似於 Hive:

hudi/hudi_table/AR/44bfae35-056b-4bcd-8970-5f1271c3845d-0_18-215-89206_20191121100011.parquet

hudi/hudi_table/CA/2a591ee9-afa4-48d9-bd16-63376a1b8e06-0_38-215-89226_20191121100011.parquet

hudi/hudi_table/CT/911510f9-0655-405f-afad-be9c15429e81-0_46-215-89234_20191121100011.parquet

表名為hudi_table,分割槽鍵為 state,真正儲存資料的檔案為parquet。

查詢資料

首先載入資料格式:

1 valtoViewDF=spark.read.format("org.apache.hudi").load(basePath +"/*/*")

我們在上面插入資料的時候,同時建立了 Hive 表,所以有以下兩種方式做查詢:

  1. 直接查詢 Hive 表:

  spark.sql("select name from hivehudi where c_class='Summit'").show()

  +--------------------+

  | name|

  +--------------------+

  | High Knob|

  | White Rock Mountain|

  | Open Mine Hill|

2. 使用臨時表:

  roViewDF.registerTempTable("hudi_ro_table")

  spark.sql("select id,name from hudi_ro_table where c_class='Stream'").show()

  +-------+--------------------+

  | id| name|

  +-------+--------------------+

  | 539931| Tiger Point Gully|

  | 871801| Dry Brook|

  | 847407| McClusky Creek|

  | 637687| Shaw Drain|

  | 749747| Duncan Creek|

  |1502779| Brushy Lick|

  …

更新資料

首先我們看一條資料:

spark.sql("select id,name from hudi_ro_table where c_class='Stream' and id=539931").show()

+------+-----------------+

| id| name|

+------+-----------------+

|539931|Tiger Point Gully|

然後更新此資料(更新的資料儲存在一個新的原始檔中):

1 2 3 4 5 6 7 8 9 10 11 12 13 valupdateRDD=sc.textFile("update.txt").map(_.split("\\|")).filter(_.length>6) valupdateDF=updateRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt)).toDF updateDF.write.format("org.apache.hudi"). option(RECORDKEY_FIELD_OPT_KEY,"c_class"). option(PARTITIONPATH_FIELD_OPT_KEY,"state"). option(PRECOMBINE_FIELD_OPT_KEY,"id"). option(TABLE_NAME, tableName). option(HIVE_SYNC_ENABLED_OPT_KEY,"true"). option(HIVE_TABLE_OPT_KEY,"hivehudi"). option(HIVE_PARTITION_FIELDS_OPT_KEY,"state"). option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName). mode(Append). save(basePath);

可以看到我們這裡使用的模式由Overwrite 改為了 Append,也就是追加的模式,其餘的基本不變。我們首先分別看一下 hive 表與 hudi 表中的資料變化。

Hive 表中:

spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()

+------+-----------------+

| id| name|

+------+-----------------+

|539931|Tiger Point Gully|

|539931| Tiger-update|

+------+-----------------+

Hudi 表中:

1 2 3 valappViewDF=spark.read.format("org.apache.hudi").load(basePath +"/*/*") appViewDF.registerTempTable("hudi_update_table") spark.sql("select id,name from hudi_update_table where c_class='Stream' and id=539931").show()

+------+-----------------+

| id| name|

+------+-----------------+

|539931|Tiger Point Gully|

|539931| Tiger-update|

+------+-----------------+

可以看到均可以查到更新後的資料。

對資料執行 select * 加上過濾條件:

可以看到表中有2個比較有意思的欄位,分別為:_hoodie_commit_time, _hoodie_commit_seqno

上文我們提到過 Hudi 有一個核心為時間軸,每次執行一個commit時,都會生成一個時間戳。這裡 _hoodie_commit_time 即記錄了commit 的時間戳。進一步的,Hudi 便是基於此實現了增量查詢。

下面我們嘗試一下增量查詢:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 // 獲取 commit 時間戳 valcommits=spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_update_table order by commitTime").map(k=> k.getString(0)).take(3) // 設定起始時間戳為上次時間戳 valbeginTime=commits(commits.length -2) // 增量查詢 valincViewDF=spark. read. format("org.apache.hudi"). option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath);<br> incViewDF.registerTempTable("hudi_incr_table") spark.sql("select * from hudi_incr_table where c_class='Stream' and id=539931").show()

這裡我們使用增量查詢的選項 VIEW_TYPE_INCREMENTAL_OPT_VAL,以及設定了時間戳的起始時間。查詢結果為:

可以看到查詢到的資料僅為上次commit 後的資料。

當然,我們也可以指定時間段內的資料進行查詢,指定下面選項即可:

 option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
 option(END_INSTANTTIME_OPT_KEY, endTime).

Hudi CLI

最後我們看下一下 Hudi CLI

// 啟動 hudi cli:

/usr/lib/hudi/cli/bin/hudi-cli.sh

// 連線hudi 資料表

connect --path s3://xxxx/hudi/hudi_table

接下來我們可以檢視提交過的 commit:

甚至回滾 commit:

commit rollback --commit 20191122073858

回滾後再次對 hive 表執行查詢:

spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()

+------+-----------------+

| id| name|

+------+-----------------+

|539931|Tiger Point Gully|

+------+-----------------+

可以看到之前更新的資料已經被刪除。

在 Hudi Cli 下,我們也可以建立表(create)、列出commit時檔案級別的資訊(commit showfiles)等。更多 Hudi cli 的用法,可以在 Hudi Cli 下輸入 help 獲取更多資訊。

References:

Apache Hudi 官方介紹:https://hudi.apache.org/index.html

Apache Hudi Quick Start:https://hudi.apache.org/quickstart.html

Apache Hudi CLI:https://hudi.apache.org/admin_guide.html

轉自:https://www.cnblogs.com/zackstang/p/11912994.html