Apache Hudi 介紹與應用
Apache Hudi
Apache Hudi 在基於 HDFS/S3 資料儲存之上,提供了兩種流原語:
- 插入更新
- 增量拉取
一般來說,我們會將大量資料儲存到HDFS/S3,新資料增量寫入,而舊資料鮮有改動,特別是在經過資料清洗,放入資料倉庫的場景。而且在資料倉庫如 hive中,對於update的支援非常有限,計算昂貴。另一方面,若是有僅對某段時間內新增資料進行分析的場景,則hive、presto、hbase等也未提供原生方式,而是需要根據時間戳進行過濾分析。
在此需求下,Hudi可以提供這兩種需求的實現。第一個是對record級別的更新,另一個是僅對增量資料的查詢。且Hudi提供了對Hive、presto、Spark的支援,可以直接使用這些元件對Hudi管理的資料進行查詢。
儲存型別
我們看一下 Hudi 的兩種儲存型別:
- 寫時複製(copy on write):僅使用列式檔案(parquet)儲存資料。在寫入/更新資料時,直接同步合併原檔案,生成新版本的基檔案(需要重寫整個列資料檔案,即使只有一個位元組的新資料被提交)。此儲存型別下,寫入資料非常昂貴,而讀取的成本沒有增加,所以適合頻繁讀的工作負載,因為資料集的最新版本在列式檔案中始終可用,以進行高效的查詢。
- 讀時合併(merge on read):使用列式(parquet)與行式(avro)檔案組合,進行資料儲存。在更新記錄時,更新到增量檔案中(avro),然後進行非同步(或同步)的compaction,建立列式檔案(parquet)的新版本。此儲存型別適合頻繁寫的工作負載,因為新記錄是以appending 的模式寫入增量檔案中。但是在讀取資料集時,需要將增量檔案與舊檔案進行合併,生成列式檔案。
檢視
在瞭解這兩種儲存型別後,我們再看一下Hudi支援的儲存資料的檢視(也就是查詢模式):
- 讀優化檢視(Read Optimized view):直接query 基檔案(資料集的最新快照),也就是列式檔案(如parquet)。相較於非Hudi列式資料集,有相同的列式查詢效能
- 增量檢視(Incremental View):僅query新寫入資料集的檔案,也就是指定一個commit/compaction,query此之後的新資料。
- 實時檢視(Real-time View):query最新基檔案與增量檔案。此檢視通過將最新的基檔案(parquet)與增量檔案(avro)進行動態合併,然後進行query。可以提供近實時的資料(會有幾分鐘的延遲)
在以上3種檢視中,“讀優化檢視”與“增量檢視”均可在“寫時複製”與“讀時合併”的儲存型別下使用。而“實時檢視“僅能在”讀時合併“模式下使用。
儲存型別 |
支援的檢視 |
寫時複製 |
讀優化 + 增量 |
讀時合併 |
讀優化 + 增量 + 近實時 |
時間軸
最後介紹一下 Hudi 的核心 —— 時間軸。Hudi 會維護一個時間軸,在每次執行操作時(如寫入、刪除、合併等),均會帶有一個時間戳。通過時間軸,可以實現在僅查詢某個時間點之後成功提交的資料,或是僅查詢某個時間點之前的資料。這樣可以避免掃描更大的時間範圍,並非常高效地只消費更改過的檔案(例如在某個時間點提交了更改操作後,僅query某個時間點之前的資料,則仍可以query修改前的資料)。
使用案例
下面我們嘗試使用Hudi API 進行讀寫。
寫入資料
首先準備資料集,部分條目為:
1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794
875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7
1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10
26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681
1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605
…
啟動spark-shell,並指定hudi jar包:
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
載入指定包:
1 2 3 4 5 6 |
import scala.collection.JavaConversions. _
import org.apache.spark.sql.SaveMode. _
import org.apache.hudi.DataSourceReadOptions. _
import org.apache.hudi.DataSourceWriteOptions. _
import org.apache.hudi.config.HoodieWriteConfig. _
import org.apache.hudi.hive.MultiPartKeysValueExtractor
|
指定建立的Hudi表名與路徑:
1 2 |
val tableName = "hudi_table"
val basePath = "s3://xxxx/xxx"
|
構造 DataFrame:
1 2 3 4 |
val lineRDD = sc.textFile( "features.txt" ).map( _ .split( "\\|" )).filter( _ .length > 6 )
case class Record(id : Int, name : String, c _ class : String, state : String, latitude : Float, longitude : String, elevation : Int)
val RecordRDD = lineRDD.map(x = >Record(x( 0 ).toInt, x( 1 ), x( 2 ), x( 3 ), x( 4 ).toFloat, x( 5 ), x( 6 ).toInt))
val featureDF = RecordRDD.toDF
|
插入資料到 Hudi(以及Hive):
1 2 3 4 5 6 7 8 9 10 11 |
featureDF.write.format( "org.apache.hudi" ).
option(RECORDKEY _ FIELD _ OPT _ KEY, "c_class" ).
option(PARTITIONPATH _ FIELD _ OPT _ KEY, "state" ).
option(PRECOMBINE _ FIELD _ OPT _ KEY, "id" ).
option(TABLE _ NAME, tableName).
option(HIVE _ SYNC _ ENABLED _ OPT _ KEY, "true" ).
option(HIVE _ TABLE _ OPT _ KEY, "hivehudi" ).
option(HIVE _ PARTITION _ FIELDS _ OPT _ KEY, "state" ).
option(HIVE _ PARTITION _ EXTRACTOR _ CLASS _ OPT _ KEY, classOf[MultiPartKeysValueExtractor].getName).
mode(Overwrite).
save(basePath);
|
我們可以看到目錄結構類似於 Hive:
hudi/hudi_table/AR/44bfae35-056b-4bcd-8970-5f1271c3845d-0_18-215-89206_20191121100011.parquet
hudi/hudi_table/CA/2a591ee9-afa4-48d9-bd16-63376a1b8e06-0_38-215-89226_20191121100011.parquet
hudi/hudi_table/CT/911510f9-0655-405f-afad-be9c15429e81-0_46-215-89234_20191121100011.parquet
…
表名為hudi_table,分割槽鍵為 state,真正儲存資料的檔案為parquet。
查詢資料
首先載入資料格式:
1 |
val toViewDF = spark.read.format( "org.apache.hudi" ).load(basePath + "/*/*" )
|
我們在上面插入資料的時候,同時建立了 Hive 表,所以有以下兩種方式做查詢:
- 直接查詢 Hive 表:
spark.sql("select name from hivehudi where c_class='Summit'").show()
+--------------------+
| name|
+--------------------+
| High Knob|
| White Rock Mountain|
| Open Mine Hill|
…
2. 使用臨時表:
roViewDF.registerTempTable("hudi_ro_table")
spark.sql("select id,name from hudi_ro_table where c_class='Stream'").show()
+-------+--------------------+
| id| name|
+-------+--------------------+
| 539931| Tiger Point Gully|
| 871801| Dry Brook|
| 847407| McClusky Creek|
| 637687| Shaw Drain|
| 749747| Duncan Creek|
|1502779| Brushy Lick|
…
更新資料
首先我們看一條資料:
spark.sql("select id,name from hudi_ro_table where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
然後更新此資料(更新的資料儲存在一個新的原始檔中):
1 2 3 4 5 6 7 8 9 10 11 12 13 |
val updateRDD = sc.textFile( "update.txt" ).map( _ .split( "\\|" )).filter( _ .length> 6 )
val updateDF = updateRDD.map(x = >Record(x( 0 ).toInt, x( 1 ), x( 2 ), x( 3 ), x( 4 ).toFloat, x( 5 ), x( 6 ).toInt)).toDF
updateDF.write.format( "org.apache.hudi" ).
option(RECORDKEY _ FIELD _ OPT _ KEY, "c_class" ).
option(PARTITIONPATH _ FIELD _ OPT _ KEY, "state" ).
option(PRECOMBINE _ FIELD _ OPT _ KEY, "id" ).
option(TABLE _ NAME, tableName).
option(HIVE _ SYNC _ ENABLED _ OPT _ KEY, "true" ).
option(HIVE _ TABLE _ OPT _ KEY, "hivehudi" ).
option(HIVE _ PARTITION _ FIELDS _ OPT _ KEY, "state" ).
option(HIVE _ PARTITION _ EXTRACTOR _ CLASS _ OPT _ KEY, classOf[MultiPartKeysValueExtractor].getName).
mode(Append).
save(basePath);
|
可以看到我們這裡使用的模式由Overwrite 改為了 Append,也就是追加的模式,其餘的基本不變。我們首先分別看一下 hive 表與 hudi 表中的資料變化。
Hive 表中:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
Hudi 表中:
1 2 3 |
val appViewDF = spark.read.format( "org.apache.hudi" ).load(basePath + "/*/*" )
appViewDF.registerTempTable( "hudi_update_table" )
spark.sql( "select id,name from hudi_update_table where c_class='Stream' and id=539931" ).show()
|
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
可以看到均可以查到更新後的資料。
對資料執行 select * 加上過濾條件:
可以看到表中有2個比較有意思的欄位,分別為:_hoodie_commit_time, _hoodie_commit_seqno
上文我們提到過 Hudi 有一個核心為時間軸,每次執行一個commit時,都會生成一個時間戳。這裡 _hoodie_commit_time 即記錄了commit 的時間戳。進一步的,Hudi 便是基於此實現了增量查詢。
下面我們嘗試一下增量查詢:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// 獲取 commit 時間戳
val commits = spark.sql( "select distinct(_hoodie_commit_time) as commitTime from hudi_update_table order by commitTime" ).map(k = > k.getString( 0 )).take( 3 )
// 設定起始時間戳為上次時間戳
val beginTime = commits(commits.length - 2 )
// 增量查詢
val incViewDF = spark.
read.
format( "org.apache.hudi" ).
option(VIEW _ TYPE _ OPT _ KEY, VIEW _ TYPE _ INCREMENTAL _ OPT _ VAL).
option(BEGIN _ INSTANTTIME _ OPT _ KEY, beginTime).
load(basePath); < br >
incViewDF.registerTempTable( "hudi_incr_table" )
spark.sql( "select * from hudi_incr_table where c_class='Stream' and id=539931" ).show()
|
這裡我們使用增量查詢的選項 VIEW_TYPE_INCREMENTAL_OPT_VAL,以及設定了時間戳的起始時間。查詢結果為:
可以看到查詢到的資料僅為上次commit 後的資料。
當然,我們也可以指定時間段內的資料進行查詢,指定下面選項即可:
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
Hudi CLI
最後我們看下一下 Hudi CLI
// 啟動 hudi cli:
/usr/lib/hudi/cli/bin/hudi-cli.sh
// 連線hudi 資料表
connect --path s3://xxxx/hudi/hudi_table
接下來我們可以檢視提交過的 commit:
甚至回滾 commit:
commit rollback --commit 20191122073858
回滾後再次對 hive 表執行查詢:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
+------+-----------------+
可以看到之前更新的資料已經被刪除。
在 Hudi Cli 下,我們也可以建立表(create)、列出commit時檔案級別的資訊(commit showfiles)等。更多 Hudi cli 的用法,可以在 Hudi Cli 下輸入 help 獲取更多資訊。
References:
Apache Hudi 官方介紹:https://hudi.apache.org/index.html
Apache Hudi Quick Start:https://hudi.apache.org/quickstart.html
Apache Hudi CLI:https://hudi.apache.org/admin_guide.html
轉自:https://www.cnblogs.com/zackstang/p/11912994.html