1. 程式人生 > >使用Amazon EMR和Apache Hudi在S3上插入,更新,刪除資料

使用Amazon EMR和Apache Hudi在S3上插入,更新,刪除資料

將資料儲存在Amazon S3中可帶來很多好處,包括規模、可靠性、成本效率等方面。最重要的是,你可以利用Amazon EMR中的Apache Spark,Hive和Presto之類的開源工具來處理和分析資料。 儘管這些工具功能強大,但是在處理需要進行增量資料處理以及記錄級別插入,更新和刪除場景時,仍然非常具有挑戰。

與客戶交談時,我們發現有些場景需要處理對單條記錄的增量更新,例如:

  • 遵守資料隱私法規,在該法規中,使用者選擇忘記或更改應用程式對資料使用方式的協議。
  • 使用流資料,當你必須要處理特定的資料插入和更新事件時。
  • 實現變更資料捕獲(CDC)架構來跟蹤和提取企業資料倉庫或運營資料儲存中的資料庫變更日誌。
  • 恢復遲到的資料,或分析特定時間點的資料。

從今天開始,EMR 5.28.0版包含Apache Hudi(孵化中),因此你不再需要構建自定義解決方案來執行記錄級別的插入,更新和刪除操作。Hudi是Uber於2016年開始開發,以解決攝取和ETL管道效率低下的問題。最近幾個月,EMR團隊與Apache Hudi社群緊密合作,提供了一些補丁,包括將Hudi更新為Spark 2.4.4,支援Spark Avro,增加了對AWS Glue Data Catalog的支援,以及多個缺陷修復。

使用Hudi,即可以在S3上執行記錄級別的插入,更新和刪除,從而使你能夠遵守資料隱私法律、消費實時流、捕獲更新的資料、恢復遲到的資料和以開放的、供應商無關的格式跟蹤歷史記錄和回滾。 建立資料集和表,然後Hudi管理底層資料格式。Hudi使用Apache Parquet和Apache Avro進行資料儲存,並內建整合Spark,Hive和Presto,使你能夠使用與現在所使用的相同工具來查詢Hudi資料集,並且幾乎實時地訪問新資料。

啟動EMR群集時,只要選擇以下元件之一(Hive,Spark,Presto),就可以自動安裝和配置Hudi的庫和工具。你可以使用Spark建立新的Hudi資料集,以及插入,更新和刪除資料。每個Hudi資料集都會在叢集的已配置元儲存庫(包括AWS Glue Data Catalog)中進行註冊,並顯示為可以通過Spark,Hive和Presto查詢的表。

Hudi支援兩種儲存型別,這些儲存型別定義瞭如何寫入,索引和從S3讀取資料:

  • 寫時複製(Copy On Write)– 資料以列格式(Parquet)儲存,並且在寫入時更新資料資料會建立新版本檔案。此儲存型別最適合用於讀取繁重的工作負載,因為資料集的最新版本在高效的列式檔案中始終可用。

  • 讀時合併(Merge On Read)– 將組合列(Parquet)格式和基於行(Avro)格式來儲存資料; 更新記錄至基於行的增量檔案中,並在以後進行壓縮,以建立列式檔案的新版本。 此儲存型別最適合於繁重的寫工作負載,因為新提交(commit)會以增量檔案格式快速寫入,但是要讀取資料集,則需要將壓縮的列檔案與增量檔案合併。

下面讓我們快速預覽下如何在EMR叢集中設定和使用Hudi資料集。

結合Apache Hudi與Amazon EMR

從EMR控制檯開始建立叢集。在高階選項中,選擇EMR版本5.28.0(第一個包括Hudi的版本)和以下應用程式:Spark,Hive和Tez。在硬體選項中,添加了3個任務節點,以確保有足夠的能力執行Spark和Hive。

群集就緒後,使用在安全性選項中選擇的金鑰對,通過SSH進入主節點並訪問Spark Shell。 使用以下命令來啟動Spark Shell以將其與Hudi一起使用:

$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
              --conf "spark.sql.hive.convertMetastoreParquet=false"
              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

使用以下Scala程式碼將一些示例ELB日誌匯入寫時複製儲存型別的Hudi資料集中:

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Set up various input values as variables
val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"
val hudiTableName = "elb_logs_hudi_cow"
val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName

// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", 
    HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", 
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", 
    DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", 
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
        classOf[MultiPartKeysValueExtractor].getName)

// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("parquet").load(inputDataPath)

// Write data into the Hudi dataset
inputDF.write
       .format("org.apache.hudi")
       .options(hudiOptions)
       .mode(SaveMode.Overwrite)
       .save(hudiTablePath)

在Spark Shell中,現在就可以計算Hudi資料集中的記錄:

scala> inputDF2.count()
res1: Long = 10491958

在選項(options)中,使用了與為叢集中的Hive Metastore整合,以便在預設資料庫(default)中建立表。 通過這種方式,我可以使用Hive查詢Hudi資料集中的資料:

hive> use default;
hive> select count(*) from elb_logs_hudi_cow;
...
OK
10491958

現在可以更新或刪除資料集中的單條記錄。 在Spark Shell中,設定了一些用來查詢更新記錄的變數,並準備用來選擇要更改的列的值的SQL語句:

val requestIpToUpdate = "243.80.62.181"
val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"

執行SQL語句以檢視列的當前值:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_003|
+------------+

然後,選擇並更新記錄:

// Create a DataFrame with a single record and update column value
val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)
                      .withColumn("elb_name", lit("elb_demo_001"))

現在用一種類似於建立Hudi資料集的語法來更新它。 但是這次寫入的DataFrame僅包含一條記錄:

// Write the DataFrame as an update to existing Hudi dataset
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(hudiTablePath)

在Spark Shell中,檢查更新的結果:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

現在想刪除相同的記錄。要刪除它,可在寫選項中傳入了EmptyHoodieRecordPayload有效負載:

// Write the DataFrame with an EmptyHoodieRecordPayload for deleting a record
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,
                "org.apache.hudi.EmptyHoodieRecordPayload")
        .mode(SaveMode.Append)
        .save(hudiTablePath)

在Spark Shell中,可以看到該記錄不再可用:

scala> spark.sql(sqlStatement).show()
+--------+                                                                      
|elb_name|
+--------+
+--------+

Hudi是如何管理所有的更新和刪除? 我們可以通過Hudi命令列介面(CLI)連線到資料集,便可以看到這些更改被解釋為提交(commits):

可以看到,此資料集是寫時複製資料集,這意味著每次對記錄進行更新時,包含該記錄的檔案將被重寫以包含更新後的值。 你可以檢視每次提交(commit)寫入了多少記錄。表格的底行描述了資料集的初始建立,上方是單條記錄更新,頂部是單條記錄刪除。

使用Hudi,你可以回滾到每個提交。 例如,可以使用以下方法回滾刪除操作:

hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031

在Spark Shell中,記錄現在回退到更新之後的位置:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

寫入時複製是預設儲存型別。 通過將其新增到我們的hudiOptions中,我們可以重複上述步驟來建立和更新讀時合併資料集型別:

DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"

如果更新讀時合併資料集並使用Hudi CLI檢視提交(commit)時,則可以看到讀時合併寫時複製相比有何不同。使用讀時合併,你僅寫入更新的行,而不像寫時複製一樣寫入整個檔案。這就是為什麼讀時合併對於需要更多寫入或使用較少讀取次數更新或刪除繁重工作負載的用例很有幫助的原因。增量提交作為Avro記錄(基於行的儲存)寫入磁碟,而壓縮資料作為Parquet檔案(列儲存)寫入。為避免建立過多的增量檔案,Hudi會自動壓縮資料集,以便使得讀取儘可能地高效。

建立讀時合併資料集時,將建立兩個Hive表:

  • 第一個表的名稱與資料集的名稱相同。
  • 第二個表的名稱後面附加了字元_rt; _rt字尾表示實時。

查詢時,第一個表返回已壓縮的資料,並不會顯示最新的增量提交。使用此表可提供最佳效能,但會忽略最新資料。查詢實時表會將壓縮的資料與讀取時的增量提交合並,因此該資料集稱為讀時合併。這將導致可以使用最新資料,但會導致效能開銷,並且效能不如查詢壓縮資料。這樣,資料工程師和分析人員可以靈活地在效能和資料新鮮度之間進行選擇。

已可用

EMR 5.28.0的所有地區現在都可以使用此新功能。將Hudi與EMR結合使用無需額外費用。你可以在EMR文件中瞭解更多有關Hudi的資訊。 這個新工具可以簡化你在S3中處理,更新和刪除資料的方式。也讓我們知道你打算將其用於哪些場景!

相關推薦

使用Amazon EMRApache Hudi在S3插入更新刪除資料

將資料儲存在Amazon S3中可帶來很多好處,包括規模、可靠性、成本效率等方面。最重要的是,你可以利用Amazon EMR中的Apache Spark,Hive和Presto之類的開源工具來處理和分析資料。 儘管這些工具功能強大,但是在處理需要進行增量資料處理以及記錄級別插入,更新和刪除場景時,仍然非常具有

一步一步學MySQL----7 插入更新刪除資料

7.1 插入完整資料記錄 語法: insert into table_name(field1,field2,field3,……)values(value1,value2,value3,……); 例如: 我們先建立一張user表,如下: mysq

八、插入更新刪除資料

插入資料基本語法:insert into table_name(column_list) values(value_list);同時插入多條記錄:insert into table_name(column_list) values(value_list1),(value_list2),...,(value_l

第8章 插入更新刪除資料

8.1 插入資料 INSER INTO的各種插入法 8.2 更新資料 UPDATE可以更新表中的記錄。基本句法如下: UPDATE table_name SET column_name1=value1,column_name1=value2,... WHERE(condition);

MariaDB 插入&更新&刪除資料(8)

MariaDB資料庫管理系統是MySQL的一個分支,主要由開源社群在維護,採用GPL授權許可MariaDB的目的是完全相容MySQL,包括API和命令列,MySQL由於現在閉源了,而能輕鬆成為MySQL的代替品.在儲存引擎方面,使用XtraDB來代替MySQL的InnoDB,MariaDB由MySQL的創始人

MySQL-插入更新刪除資料

資料庫通過插入、更新和刪除等方式來改變表中的記錄。插入資料是向表中插入新的記錄,通過insert語句來實現。更新資料時改變表中已經存在的資料,使用update語句來實現。刪除資料是刪除表中不再使用的資料,通過delete語句來實現。 插入資料 插入資料是向表中插入新的記錄。

MySQL資料庫 之 插入更新刪除資料

一、插入資料 MySQL 中使用 insert 語句來向資料庫表中插入新的資料記錄。 ☆ 為表的所有欄位插入資料 insert into tb_name (col_list) values (value_list) 建立一個數據表 person

conda pip 安裝更新刪除

con 控制臺 ins uil update 控制 源碼 pda 解壓 安裝: pip install xxx conda install xxx 更新: pip install --upgrade xxx conda update xxx 刪除: pip uni

tthinkphp5.0的 插入更新刪除查詢方法。

插入 第一種:Db::table('oa_user')->insert(['array'=val,'value'=>key]) 第一種 Db::name('user')->inse

SQL 隨筆記錄 插入更新擷取

-- 正數時: 擷取字串中逗號',' 第一次出現左邊的字串 select SUBSTRING_INDEX(telephone,',', 1) as functionA from user -- 負數時: 擷取字串中逗號',' 最後一次出現右邊的字串 select SUBSTRIN

Oracle 觸發器 插入更新刪除,資料同步兩表同步

create or replace trigger UserToTemp after insert or update or delete on user_info for each row declare     integrity_error exception;     errno           

創建.刪除更新獲取數據庫命令

create des 獲取 string eat 獲取數據 hat 建數據庫 echart 創建數據庫: const std::string createImMessageIndex = "CREATE UNIQUE INDEX IF NOT EXISTS sessioni

使用T-SQL語句插入更新刪除數據表

數據庫 sql server sql語言 楊書凡 t-sql語句 在對數據進行管理時,使用SSMS進行數據維護有可視化、方便的優點,但是在批量維護或重復維護時,使用SSMS就不方便了,還容易出錯,這就需要編寫SQL語句對數據庫進行維護SQL和T-SQL SQL是Structu

12W學習筆記——獨立子查詢更新刪除建立視圖

記錄 date 筆記 獨立 https 清空 名單 sel 方式 查詢的學習進入了最後階段了,接下來我就來介紹一下查詢最後的獨立子查詢。 獨立子查詢基本上都是用SELECT FROM WHERE 三個語句。 例:在學院表、專業表中,查詢人文與管理學院下設各專業的簡稱

MySQL數據庫基礎(六)——SQL插入更新刪除操作

SQLMySQL數據庫基礎(六)——SQL插入、更新、刪除操作 一、插入數據 1、為表的所有字段插入數據 使用基本的INSERT語句插入數據要求指定表名稱和插入到新記錄中的值。 INSERT INTO table_name (column_list) VALUES (value_list); insert i

elasticsearch 5.x 系列之六 文檔索引更新查詢刪除流程

取數 獲取 info ast 負載均衡 blog img 選擇 將在 一、elasticsearch index 索引流程 步驟: 客戶端向Node1 發送索引文檔請求 Node1 根據文檔ID(_id字段)計算出該文檔應該屬於shard0,然後請求路由到Node3的P0分

Mybatis的應用2 使用mybits+SpringBoot完成第一個查詢的demo(隨後加增加更新刪除

添加 span string nco 返回 一個數 .config 地址 slf4 首先在mapper下面新建一個mysql.xml mysql.xml <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYP

Spring boot集成Redis(1)—進行增加更新查詢批量刪除等操作

緩存 獲取數據 prope XML ray end 序列 www pin 前言:最近工作中使用到了redis緩存,故分享一點自己總結的東西,這篇文章使用的是StringRedisTemplate進行學習,這裏值的說的是,(1)StringRedisTemplate在進行批量

osg 場景節點(事件更新裁剪)遍歷狀態開啟機制

OSG 顯示引擎為優化其顯示效率,節點的更新遍歷,事件處理遍歷預設情況下是關閉的,內部通過更新計數來控制是否將事件處理,節點更新應用到子場景中。 void Node::setUpdateCallback(Callback* nc); void Node::setEventCallback(Cal

elasticsearch 索引更新增量更新不停機修改升級 ,簡單使用

elasticsearch 使用 對es瞭解的並不是太多,記錄簡單使用場景。若哪裡不對,還請大人指出 舉例: 根據商品名稱搜尋商品 建立索引 product_v1 為什麼要帶v1呢,後面方便升級 #新增索引 put product_v1?pretty #檢視一下 get _cat