1. 程式人生 > >寫入Apache Hudi資料集

寫入Apache Hudi資料集

這一節我們將介紹使用DeltaStreamer工具從外部源甚至其他Hudi資料集攝取新更改的方法,
以及通過使用Hudi資料來源的upserts加快大型Spark作業的方法。
對於此類資料集,我們可以使用各種查詢引擎查詢它們。

寫操作

在此之前,瞭解Hudi資料來源及delta streamer工具提供的三種不同的寫操作以及如何最佳利用它們可能會有所幫助。
這些操作可以在針對資料集發出的每個提交/增量提交中進行選擇/更改。

  • UPSERT(插入更新) :這是預設操作,在該操作中,通過查詢索引,首先將輸入記錄標記為插入或更新。
    在執行啟發式方法以確定如何最好地將這些記錄放到儲存上,如優化檔案大小之類後,這些記錄最終會被寫入。
    對於諸如資料庫更改捕獲之類的用例,建議該操作,因為輸入幾乎肯定包含更新。
  • INSERT(插入) :就使用啟發式方法確定檔案大小而言,此操作與插入更新(UPSERT)非常相似,但此操作完全跳過了索引查詢步驟。
    因此,對於日誌重複資料刪除等用例(結合下面提到的過濾重複項的選項),它可以比插入更新快得多。
    插入也適用於這種用例,這種情況資料集可以允許重複項,但只需要Hudi的事務寫/增量提取/儲存管理功能。
  • BULK_INSERT(批插入) :插入更新和插入操作都將輸入記錄儲存在記憶體中,以加快儲存優化啟發式計算的速度(以及其它未提及的方面)。
    所以對Hudi資料集進行初始載入/引導時這兩種操作會很低效。批量插入提供與插入相同的語義,但同時實現了基於排序的資料寫入演算法,
    該演算法可以很好地擴充套件數百TB的初始負載。但是,相比於插入和插入更新能保證檔案大小,批插入在調整檔案大小上只能盡力而為。

DeltaStreamer

HoodieDeltaStreamer實用工具 (hudi-utilities-bundle中的一部分) 提供了從DFS或Kafka等不同來源進行攝取的方式,並具有以下功能。

  • 從Kafka單次攝取新事件,從Sqoop、HiveIncrementalPuller輸出或DFS資料夾中的多個檔案
    增量匯入
  • 支援json、avro或自定義記錄型別的傳入資料
  • 管理檢查點,回滾和恢復
  • 利用DFS或Confluent schema登錄檔的Avro模式。
  • 支援自定義轉換操作

命令列選項更詳細地描述了這些功能:

[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help
Usage: <main class> [options]
  Options:
    --commit-on-errors
        Commit even when some records failed to be written
      Default: false
    --enable-hive-sync
          Enable syncing to hive
       Default: false
    --filter-dupes
          Should duplicate records from source be dropped/filtered outbefore 
          insert/bulk-insert 
      Default: false
    --help, -h
    --hudi-conf
          Any configuration that can be set in the properties file (using the CLI 
          parameter "--propsFilePath") can also be passed command line using this 
          parameter 
          Default: []
    --op
      Takes one of these values : UPSERT (default), INSERT (use when input is
      purely new data/inserts to gain speed)
      Default: UPSERT
      Possible Values: [UPSERT, INSERT, BULK_INSERT]
    --payload-class
      subclass of HoodieRecordPayload, that works off a GenericRecord.
      Implement your own, if you want to do something other than overwriting
      existing value
      Default: org.apache.hudi.OverwriteWithLatestAvroPayload
    --props
      path to properties file on localfs or dfs, with configurations for
      Hudi client, schema provider, key generator and data source. For
      Hudi client props, sane defaults are used, but recommend use to
      provide basic things like metrics endpoints, hive configs etc. For
      sources, referto individual classes, for supported properties.
      Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties
    --schemaprovider-class
      subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach
      schemas to input & target table data, built in options:
      FilebasedSchemaProvider
      Default: org.apache.hudi.utilities.schema.FilebasedSchemaProvider
    --source-class
      Subclass of org.apache.hudi.utilities.sources to read data. Built-in
      options: org.apache.hudi.utilities.sources.{JsonDFSSource (default),
      AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}
      Default: org.apache.hudi.utilities.sources.JsonDFSSource
    --source-limit
      Maximum amount of data to read from source. Default: No limit For e.g:
      DFSSource => max bytes to read, KafkaSource => max events to read
      Default: 9223372036854775807
    --source-ordering-field
      Field within source record to decide how to break ties between records
      with same key in input data. Default: 'ts' holding unix timestamp of
      record
      Default: ts
    --spark-master
      spark master to use.
      Default: local[2]
  * --target-base-path
      base path for the target Hudi dataset. (Will be created if did not
      exist first time around. If exists, expected to be a Hudi dataset)
  * --target-table
      name of the target table in Hive
    --transformer-class
      subclass of org.apache.hudi.utilities.transform.Transformer. UDF to
      transform raw source dataset to a target dataset (conforming to target
      schema) before writing. Default : Not set. E:g -
      org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which
      allows a SQL query template to be passed as a transformation function)

該工具採用層次結構組成的屬性檔案,並具有可插拔的介面,用於提取資料、生成金鑰和提供模式。
從Kafka和DFS攝取資料的示例配置在這裡:hudi-utilities/src/test/resources/delta-streamer-config

例如:當您讓Confluent Kafka、Schema登錄檔啟動並執行後,可以用這個命令產生一些測試資料
(impressions.avro,
由schema-registry程式碼庫提供)

[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid

然後用如下命令攝取這些資料。

[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
  --props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
  --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
  --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
  --source-ordering-field impresssiontime \
  --target-base-path file:///tmp/hudi-deltastreamer-op --target-table uber.impressions \
  --op BULK_INSERT

在某些情況下,您可能需要預先將現有資料集遷移到Hudi。 請參考遷移指南。

Datasource Writer

hudi-spark模組提供了DataSource API,可以將任何資料幀寫入(也可以讀取)到Hudi資料集中。
以下是在指定需要使用的欄位名稱的之後,如何插入更新資料幀的方法,這些欄位包括
recordKey => _row_keypartitionPath => partitionprecombineKey => timestamp

inputDF.write()
       .format("org.apache.hudi")
       .options(clientOpts) // 可以傳入任何Hudi客戶端引數
       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
       .option(HoodieWriteConfig.TABLE_NAME, tableName)
       .mode(SaveMode.Append)
       .save(basePath);

與Hive同步

上面的兩個工具都支援將資料集的最新模式同步到Hive Metastore,以便查詢新的列和分割槽。
如果需要從命令列或在獨立的JVM中執行它,Hudi提供了一個HiveSyncTool
在構建了hudi-hive模組之後,可以按以下方式呼叫它。

cd hudi-hive
./run_sync_tool.sh
 [hudi-hive]$ ./run_sync_tool.sh --help
Usage: <main class> [options]
  Options:
  * --base-path
       Basepath of Hudi dataset to sync
  * --database
       name of the target database in Hive
    --help, -h
       Default: false
  * --jdbc-url
       Hive jdbc connect url
  * --pass
       Hive password
  * --table
       name of the target table in Hive
  * --user
       Hive username

刪除資料

通過允許使用者指定不同的資料記錄負載實現,Hudi支援對儲存在Hudi資料集中的資料執行兩種型別的刪除。

  • Soft Deletes(軟刪除) :使用軟刪除時,使用者希望保留鍵,但僅使所有其他欄位的值都為空。
    通過確保適當的欄位在資料集模式中可以為空,並在將這些欄位設定為null之後直接向資料集插入更新這些記錄,即可輕鬆實現這一點。
  • Hard Deletes(硬刪除) :這種更強形式的刪除是從資料集中徹底刪除記錄在儲存上的任何痕跡。
    這可以通過觸發一個帶有自定義負載實現的插入更新來實現,這種實現可以使用總是返回Optional.Empty作為組合值的DataSource或DeltaStreamer。
    Hudi附帶了一個內建的org.apache.hudi.EmptyHoodieRecordPayload類,它就是實現了這一功能。
 deleteDF // 僅包含要刪除的記錄的資料幀
   .write().format("org.apache.hudi")
   .option(...) // 根據設定需要新增HUDI引數,例如記錄鍵、分割槽路徑和其他引數
   // 指定record_key,partition_key,precombine_fieldkey和常規引數
   .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
 

儲存管理

Hudi還對儲存在Hudi資料集中的資料執行幾個關鍵的儲存管理功能。在DFS上儲存資料的關鍵方面是管理檔案大小和數量以及回收儲存空間。
例如,HDFS在處理小檔案上效能很差,這會對Name Node的記憶體及RPC施加很大的壓力,並可能破壞整個叢集的穩定性。
通常,查詢引擎可在較大的列檔案上提供更好的效能,因為它們可以有效地攤銷獲得列統計資訊等的成本。
即使在某些雲資料儲存上,列出具有大量小檔案的目錄也常常比較慢。

以下是一些有效管理Hudi資料集儲存的方法。

  • Hudi中的小檔案處理功能,可以分析傳入的工作負載並將插入內容分配到現有檔案組中,
    而不是建立新檔案組。新檔案組會生成小檔案。
  • 可以配置Cleaner來清理較舊的檔案片,清理的程度可以調整,
    具體取決於查詢所需的最長時間和增量拉取所需的回溯。
  • 使用者還可以調整基礎/parquet檔案、日誌檔案的大小
    和預期的壓縮率,使足夠數量的插入被分到同一個檔案組中,最終產生大小合適的基礎檔案。
  • 智慧調整批插入並行度,可以產生大小合適的初始檔案組。
    實際上,正確執行此操作非常關鍵,因為檔案組一旦建立後就不能刪除,只能如前所述對其進行擴充套件。
  • 對於具有大量更新的工作負載,讀取時合併儲存提供了一種很好的機制,
    可以快速將其攝取到較小的檔案中,之後通過壓縮將它們合併為較大的基礎檔案。

相關推薦

寫入Apache Hudi資料

這一節我們將介紹使用DeltaStreamer工具從外部源甚至其他Hudi資料集攝取新更改的方法, 以及通過使用Hudi資料來源的upserts加快大型Spark作業的方法。 對於此類資料集,我們可以使用各種查詢引擎查詢它們。 寫操作 在此之前,瞭解Hudi資料來源及delta streamer工具提供的三種

實戰 | 將Apache Hudi資料寫入阿里雲OSS

### 1. 引入 雲上物件儲存的廉價讓不少公司將其作為主要的儲存方案,而Hudi作為資料湖解決方案,支援物件儲存也是必不可少。之前AWS EMR已經內建整合Hudi,也意味著可以在S3上無縫使用Hudi。當然國內使用者可能更多使用阿里雲OSS作為雲上儲存方案,那麼如果使用者想基於OSS構建資料湖,那麼Hu

官宣!AWS Athena正式可查詢Apache Hudi資料

## 1. 引入 Apache Hudi是一個開源的增量資料處理框架,提供了行級insert、update、upsert、delete的細粒度處理能力(`Upsert`表示如果資料集中存在記錄就更新;否則插入)。 Hudi處理資料插入和更新,不會建立太多的小檔案(小檔案會導致查詢端效能降低),Apache

Apache Drill 1.15.0 釋出,大型資料分析系統

   Apache Drill 1.15.0 釋出了,Drill 是一個大資料 SQL 查詢引擎,其在基於 SQL 的資料分析和商業智慧(BI)上引入了 JSON 檔案模型,使得使用者能查詢固定架構、演化架構,以及各種格式和資料儲存中的模式無關(schema-free

java利用poi開源庫實現將資料寫入Excel表格並儲存在本地

一,目前主流的關於讀寫excel表格的有poi 和jxl開源庫,這裡只是簡單的介紹如何poi將資料集寫進Excel表格,並存進本地。 二,官網下載poi的相關jar包,網址 http://poi.apache.org/download.html#POI-4.0.1 &nb

使用Amazon EMR和Apache Hudi在S3上插入,更新,刪除資料

將資料儲存在Amazon S3中可帶來很多好處,包括規模、可靠性、成本效率等方面。最重要的是,你可以利用Amazon EMR中的Apache Spark,Hive和Presto之類的開源工具來處理和分析資料。 儘管這些工具功能強大,但是在處理需要進行增量資料處理以及記錄級別插入,更新和刪除場景時,仍然非常具有

Apache Hudi又雙叕被國內頂級雲服務提供商成了!

> 是的,最近國內雲服務提供商騰訊雲在其EMR-V2.2.0版本中優先集成了Hudi 0.5.1版本作為其雲上的資料湖解決方案對外提供服務 Apache Hudi 在 HDFS 的資料集上提供了插入更新和增量拉取的流原語。 一般來說,我們會將大量資料儲存到 HDFS,新資料增量寫入,而舊資料鮮有改動,特別

Uber基於Apache Hudi構建PB級資料湖實踐

## 1. 引言 從確保準確預計到達時間到預測最佳交通路線,在Uber平臺上提供安全、無縫的運輸和交付體驗需要可靠、高效能的大規模資料儲存和分析。2016年,Uber開發了增量處理框架Apache Hudi,以低延遲和高效率為關鍵業務資料管道賦能。一年後,我們開源了該解決方案,以使得其他有需要的組織也可以利

使用Apache Spark和Apache Hudi構建分析資料

## 1. 引入 大多數現代資料湖都是基於某種分散式檔案系統(DFS),如HDFS或基於雲的儲存,如AWS S3構建的。遵循的基本原則之一是檔案的“一次寫入多次讀取”訪問模型。這對於處理海量資料非常有用,如數百GB到TB的資料。 但是在構建分析資料湖時,更新資料並不罕見。根據不同場景,這些更新頻率可能是每

使用Apache Hudi構建大規模、事務性資料

一個近期由Hudi PMC & Uber Senior Engineering Manager Nishith Agarwal分享的Talk ![](https://img2020.cnblogs.com/blog/616953/202006/616953-20200627212325463-3835958

Apache Hudi助力nClouds加速資料交付

### 1. 概述 在[nClouds](https://www.nclouds.com/services/data-and-analytics-services)上,當客戶的業務決策取決於對近實時資料的訪問時,客戶通常會向我們尋求有關資料和分析平臺的解決方案。但隨著每天建立和收集的資料量都在增加,這使得使

使用Apache Hudi + Amazon S3 + Amazon EMR + AWS DMS構建資料

### 1. 引入 資料湖使組織能夠在更短的時間內利用多個源的資料,而不同角色使用者可以以不同的方式協作和分析資料,從而實現更好、更快的決策。Amazon Simple Storage Service(amazon S3)是針對結構化和非結構化資料的高效能物件儲存服務,可以用來作為資料湖底層的儲存服務。

通過Apache Hudi和Alluxio建設高效能資料

T3出行的楊華和張永旭描述了他們資料湖架構的發展。該架構使用了眾多開源技術,包括Apache Hudi和Alluxio。在本文中,您將看到我們如何使用Hudi和Alluxio將資料攝取時間縮短一半。此外,資料分析人員如何使用Presto、Hudi和Alluxio讓查詢速度提高了10倍。我們基於資料編排為資料管

資料湖框架選型很糾結?一文了解Apache Hudi核心優勢

![](https://img2020.cnblogs.com/blog/616953/202101/616953-20210101095826047-596914080.png) 英文原文:https://hudi.apache.org/blog/hudi-indexing-mechanisms/ A

資料湖-Apache Hudi

## Hudi特性 - 資料湖處理非結構化資料、日誌資料、結構化資料 - 支援較快upsert/delete, 可插入索引 - Table Schema - 小檔案管理Compaction - ACID語義保證,多版本保證 並具有回滾功能 - savepoint 使用者資料恢復的儲存點 - 支援

Apache狀態監測重啟

用戶 ron art ash lin 可執行 con net linux服務 註:此文摘自:http://www.111cn.net/sys/linux/63718.htm 原理:通過服務器本地訪問自身Apache服務(與用戶訪問網站類似),如超過15s沒有返回正常的220

apache+svn+ldap

use provide auth code ces word acc 需要 pac apache+svn搭建方式如下:http://www.cnblogs.com/uglyliu/p/6914056.html SVN和ldap集成,我用的方式只需要更改 /etc/http/

[2] SSD配置+訓練VOC0712+訓練自己的資料

GitHub https://github.com/weiliu89/caffe/tree/ssd http://blog.csdn.net/u010733679/article/details/52125597 一、安裝配置 sudo apt-get install -y

[Bug]ArcGIS10.1 for Desktop為SDE柵格資料建立金字塔只能建立level1的問題

導語 前兩天去客戶那邊交流,客戶提出一個問題,我的ArcGIS10.1 for Desktop,我並沒有使用Esri提供的RasterDataset、RasterCatalog、MosaicDataset儲存模型,直接將tiff資料匯入到ArcSDE地理資料庫中,第一次匯入已經預設建立了金字塔

Spotfire使用經驗——使用文件屬性(Document Property)實現資料範圍的動態限定

需求描述: 由於業務系統將被升級,現有的Spotfire報表的資料呈現將會收到影響。因此,使用者希望能夠在Spotfire中選擇檢視不同時間範圍內的資料,即,業務系統升級前的資料和升級後的資料。 解決方法: 使用者提出的方法是,建立兩份Spotfire報表。我建議使用者最好不要這樣做