1. 程式人生 > >Apache Hudi + AWS S3 + Athena實戰

Apache Hudi + AWS S3 + Athena實戰

Apache Hudi在阿里巴巴集團、EMIS Health,LinkNovate,[Tathastu.AI](http://tathastu.ai/),騰訊,Uber內使用,並且由Amazon AWS EMR和Google雲平臺支援,最近Amazon Athena支援了在Amazon S3上查詢Apache Hudi資料集的能力,本部落格將測試Athena查詢S3上Hudi格式資料集。 ## 1. 準備-Spark環境,S3 Bucket 需要使用Spark寫入Hudi資料,登陸Amazon EMR並啟動spark-shell: ```shell $ export SCALA_VERSION=2.12 $ export SPARK_VERSION=2.4.4 $ spark-shell \ --packages org.apache.hudi:hudi-spark-bundle_${SCALA_VERSION}:0.5.3,org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}\ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' ... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) Type in expressions to have them evaluated. Type :help for more information. scala> ``` 接著使用如下scala程式碼設定表名,基礎路徑以及資料生成器來生成資料。這裡設定`basepath`為`s3://hudi_athena_test/hudi_trips`,以便後面進行查詢 ```scala import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_trips" val basePath = "s3://hudi_athena_test/hudi_trips" val dataGen = new DataGenerator ``` ## 2. 插入資料 生成新的行程資料,匯入DataFrame,並將其寫入Hudi表 ```scala val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath) ``` ## 3. 建立Athena資料庫/表 Hudi內建表分割槽支援,所以在建立表後需要新增分割槽,安裝`athenareader`工具,其提供Athena多個查詢和其他有用的特性。 ```shell go get -u github.com/uber/athenadriver/athenareader ``` 接著建立`hudi_athena_test.sql`檔案,內容如下 ```sql DROP DATABASE IF EXISTS hudi_athena_test CASCADE; create database hudi_athena_test; CREATE EXTERNAL TABLE `trips`( `begin_lat` double, `begin_lon` double, `driver` string, `end_lat` double, `end_lon` double, `fare` double, `rider` string, `ts` double, `uuid` string ) PARTITIONED BY (`partitionpath` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://hudi_athena_test/hudi_trips' ALTER TABLE trips ADD PARTITION (partitionpath = 'americas/united_states/san_francisco') LOCATION 's3://hudi_athena_test/hudi_trips/americas/united_states/san_francisco' PARTITION (partitionpath = 'americas/brazil/sao_paulo') LOCATION 's3://hudi_athena_test/hudi_trips/americas/brazil/sao_paulo' PARTITION (partitionpath = 'asia/india/chennai') LOCATION 's3://hudi_athena_test/hudi_trips/asia/india/chennai' ``` 使用如下命令執行SQL語句 ```shell $ athenareader -q hudi_athena_test.sql ``` ## 4. 使用Athena查詢Hudi 如果沒有錯誤,那麼說明庫和表在Athena中都已建立好,因此可以在Athena中查詢Hudi資料集,使用`athenareader`查詢結果如下 ```shell athenareader -q "select * from trips" -o markdown ``` ![](https://img2020.cnblogs.com/blog/616953/202008/616953-20200803192420348-1145023424.png) 也可以帶條件進行查詢 ```shell athenareader -q "select fare,rider from trips where fare>20" -o markdown ``` ![](https://img2020.cnblogs.com/blog/616953/202008/616953-20200803192433882-1559031823.png) ## 5. 更新Hudi表再次查詢 Hudi支援S3中的資料,回到spark-shell並使用如下命令更新部分資料 ```scala val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) ``` 執行完成後,使用`athenareader`再次查詢 ```shell athenareader -q "select * from trips" -o markdown ``` 可以看到資料已經更新了 ![](https://img2020.cnblogs.com/blog/616953/202008/616953-20200803192446759-1802150395.png) ## 6. 限制 Athena不支援查詢快照或增量查詢,Hive/SparkSQL支援,為進行驗證,通過spark-shell建立一個快照 ```scala spark. read. format("hudi"). load(basePath + "/*/*/*/*"). createOrReplaceTempView("hudi_trips_snapshot") ``` 使用如下程式碼查詢 ```scala val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) ``` 使用Athena查詢將會失敗,因為沒有物化 ```shell $ athenareader -q "select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime" SYNTAX_ERROR: line 1:57: Table awsdatacatalog.hudi_athena_test.hudi_trips_snapshot does not exist ``` 根據官方文件,Athena支援查詢Hudi資料集的Read-Optimized檢視,同時,我們可以通過Athena來建立檢視並進行查詢,使用Athena在Hudi表上建立一個檢視 ```shell $ athenareader -q "create view fare_greater_than_40 as select * from trips where fare>40" -a ``` 查詢檢視 ```shell $ athenareader -q "select fare,rider from fare_greater_than_40" FARE RIDER 43.4923811219014 rider-213 63.72504913279929 rider-284 90.25710109008239 rider-284 93.56018115236618 rider-213 49.527694252432056 rider-284 90.9053809533154 rider-284 98.3428192817987 rider-