實戰|使用Spark Streaming寫入Hudi
1. 專案背景
傳統數倉的組織架構是針對離線資料的OLAP(聯機事務分析)需求設計的,常用的匯入資料方式為採用sqoop或spark定時作業逐批將業務庫資料匯入數倉。隨著資料分析對實時性要求的不斷提高,按小時、甚至分鐘級的資料同步越來越普遍。由此展開了基於spark/flink流處理機制的(準)實時同步系統的開發。
然而實時同步數倉從一開始就面臨如下幾個挑戰:
- 小檔案問題。不論是spark的microbatch模式,還是flink的逐條處理模式,每次寫入HDFS時都是幾M甚至幾十KB的檔案。長時間下來產生的大量小檔案,會對HDFS namenode產生巨大的壓力。
- 對update操作的支援。HDFS系統本身不支援資料的修改,無法實現同步過程中對記錄進行修改。
- 事務性。不論是追加資料還是修改資料,如何保證事務性。即資料只在流處理程式commit操作時一次性寫入HDFS,當程式rollback時,已寫入或部分寫入的資料能隨之刪除。
Hudi是針對以上問題的解決方案之一。以下是對Hudi的簡單介紹,主要內容翻譯自官網。
2. Hudi簡介
2.1 時間線(Timeline)
Hudi內部按照操作時刻(instant)對錶的所有操作維護了一條時間線,由此可以提供表在某一時刻的檢視,還能夠高效的提取出延後到達的資料。每一個時刻包含:
- 時刻行為:對錶操作的型別,包含:
commit:提交,將批次的資料原子性的寫入表;
clean: 清除,後臺作業,不斷清除不需要的舊得版本的資料;
delta_commit:delta 提交是將批次記錄原子性的寫入MergeOnRead表中,資料寫入的目的地是delta日誌檔案;
compacttion:壓縮,後臺作業,將不同結構的資料,例如記錄更新操作的行式儲存的日誌檔案合併到列式儲存的檔案中。壓縮本身是一個特殊的commit操作;
rollback:回滾,一些不成功時,刪除所有部分寫入的檔案;
savepoint:儲存點,標誌某些檔案組為“儲存的“,這樣cleaner就不會刪除這些檔案;
- 時刻時間:操作開始的時間戳;
- 狀態:時刻的當前狀態,包含:
requested 某個操作被安排執行,但尚未初始化
inflight 某個操作正在執行
completed 某一個操作在時間線上已經完成
Hudi保證按照時間線執行的操作按照時刻時間具有原子性及時間線一致性。
2.2 檔案管理
Hudi表存在在DFS系統的 base path(使用者寫入Hudi時自定義) 目錄下,在該目錄下被分成不同的分割槽。每一個分割槽以 partition path 作為唯一的標識,組織形式與Hive相同。
每一個分割槽內,檔案通過唯一的 FileId 檔案id 劃分到 FileGroup 檔案組。每一個FileGroup包含多個 FileSlice 檔案切片,每一個切片包含一個由commit或compaction操作形成的base file 基礎檔案(parquet檔案),以及包含對基礎檔案進行inserts/update操作的log files 日誌檔案(log檔案)。Hudi採用了MVCC設計,compaction操作會將日誌檔案和對應的基礎檔案合併成新的檔案切片,clean操作則刪除無效的或老版本的檔案。
2.3 索引
Hudi通過對映Hoodie鍵(記錄鍵+ 分割槽路徑)到檔案id,提供了高效的upsert操作。當第一個版本的記錄寫入檔案時,這個記錄鍵值和檔案的對映關係就不會發生任何改變。換言之,對映的檔案組始終包含一組記錄的所有版本。
2.4 表型別&查詢
Hudi表型別定義了資料是如何被索引、分佈到DFS系統,以及以上基本屬性和時間線事件如何施加在這個組織上。查詢型別定義了底層資料如何暴露給查詢。
表型別 | 支援的查詢型別 |
---|---|
Copy On Write寫時複製 | 快照查詢 + 增量查詢 |
Merge On Read讀時合併 | 快照查詢 + 增量查詢 + 讀取優化 |
2.4.1 表型別
Copy On Write:僅採用列式儲存檔案(parquet)儲存檔案。更新資料時,在寫入的同時同步合併檔案,僅僅修改檔案的版次並重寫。
Merge On Read:採用列式儲存檔案(parquet)+行式儲存檔案(avro)儲存資料。更新資料時,新資料被寫入delta檔案並隨後以非同步或同步的方式合併成新版本的列式儲存檔案。
取捨 | CopyOnWrite | MergeOnRead |
---|---|---|
資料延遲 | 高 | 低 |
Update cost (I/O)更新操作開銷(I/O) | 高(重寫整個parquet) | 低(追加到delta記錄) |
Parquet檔案大小 | 小(高更新(I/O)開銷) | 大(低更新開銷) |
寫入頻率 | 高 | 低(取決於合併策略) |
2.4.2 查詢型別
- 快照查詢:查詢會看到以後的提交操作和合並操作的最新的錶快照。對於merge on read表,會將最新的基礎檔案和delta檔案進行合併,從而會看到近實時的資料(幾分鐘的延遲)。對於copy on write表,當存在更新/刪除操作時或其他寫操作時,會直接代替已有的parquet表。
- 增量查詢:查詢只會看到給定提交/合併操作之後新寫入的資料。由此有效的提供了變更流,從而實現了增量資料管道。
- 讀優化查詢:查詢會看到給定提交/合併操作之後表的最新快照。只會檢視到最新的檔案切片中的基礎/列式儲存檔案,並且保證和非hudi列式儲存表相同的查詢效率。
取捨 | 快照 | 讀取優化 |
---|---|---|
資料延遲 | 低 | 高 |
查詢延遲 | 高(合併基礎/列式儲存檔案 + 行式儲存delta / 日誌 檔案) | 低(原有的基礎/列式儲存檔案查詢效能) |
3. Spark結構化流寫入Hudi
以下是整合spark結構化流+hudi的示意程式碼,由於Hudi OutputFormat目前只支援在spark rdd物件中呼叫,因此寫入HDFS操作採用了spark structured streaming的forEachBatch運算元。具體說明見註釋。
package pers.machi.sparkhudi
import org.apache.log4j.Logger
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object SparkHudi {
val logger = Logger.getLogger(SparkHudi.getClass)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("SparkHudi")
//.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", 9)
.config("spark.sql.shuffle.partitions", 9)
.enableHiveSupport()
.getOrCreate()
// 新增監聽器,每一批次處理完成,將該批次的相關資訊,如起始offset,抓取記錄數量,處理時間列印到控制檯
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
// 定義kafka流
val dataStreamReader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testTopic")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100000)
.option("failOnDataLoss", false)
// 載入流資料,這裡因為只是測試使用,直接讀取kafka訊息而不做其他處理,是spark結構化流會自動生成每一套訊息對應的kafka元資料,如訊息所在主題,分割槽,訊息對應offset等。
val df = dataStreamReader.load()
.selectExpr(
"topic as kafka_topic"
"CAST(partition AS STRING) kafka_partition",
"cast(timestamp as String) kafka_timestamp",
"CAST(offset AS STRING) kafka_offset",
"CAST(key AS STRING) kafka_key",
"CAST(value AS STRING) kafka_value",
"current_timestamp() current_time",
)
.selectExpr(
"kafka_topic"
"concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
"kafka_offset",
"kafka_timestamp",
"kafka_key",
"kafka_value",
"substr(current_time,1,10) partition_date")
// 建立並啟動query
val query = df
.writeStream
.queryName("demo").
.foreachBatch { (batchDF: DataFrame, _: Long) => {
batchDF.persist()
println(LocalDateTime.now() + "start writing cow table")
batchDF.write.format("org.apache.hudi")
.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
.option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")
// 以kafka分割槽和偏移量作為組合主鍵
.option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset")
// 以當前日期作為分割槽
.option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")
.option(TABLE_NAME, "copy_on_write_table")
.option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)
.mode(SaveMode.Append)
.save("/tmp/sparkHudi/COPY_ON_WRITE")
println(LocalDateTime.now() + "start writing mor table")
batchDF.write.format("org.apache.hudi")
.option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ")
.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
.option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")
.option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")
.option(TABLE_NAME, "merge_on_read_table")
.option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)
.mode(SaveMode.Append)
.save("/tmp/sparkHudi/MERGE_ON_READ")
println(LocalDateTime.now() + "finish")
batchDF.unpersist()
}
}
.option("checkpointLocation", "/tmp/sparkHudi/checkpoint/")
.start()
query.awaitTermination()
}
}
4. 測試結果
受限於測試條件,這次測試沒有考慮update操作,而僅僅是測試hudi對追加新資料的效能。
資料程式一共執行5天,期間未發生報錯導致程式退出。
kafka每天讀取資料約1500萬條,被消費的topic共有9個分割槽。
幾點說明如下
1 是否有資料丟失及重複
由於每條記錄的分割槽+偏移量具有唯一性,通過檢查同一分割槽下是否有偏移量重複及不連續的情況,可以斷定資料不存丟失及重複消費的情況。
2 最小可支援的單日寫入資料條數
資料寫入效率,對於cow及mor表,不存在更新操作時,寫入速率接近。這本次測試中,spark每秒處理約170條記錄。單日可處理1500萬條記錄。
3 cow和mor表文件大小對比
每十分鐘讀取兩種表同一分割槽小檔案大小,單位M。結果如下圖,mor表文件大小增加較大,佔用磁碟資源較多。不存在更新操作時,儘可能使用cow表。
相關推薦
實戰|使用Spark Streaming寫入Hudi
1. 專案背景 傳統數倉的組織架構是針對離線資料的OLAP(聯機事務分析)需求設計的,常用的匯入資料方式為採用sqoop或spark定時作業逐批將業務庫資料匯入數倉。隨著資料分析對實時性要求的不斷提高,按小時、甚至分鐘級的資料同步越來越普遍。由此展開了基於spark/flink流處理機制的(準)實時同步系統的
Scala和Java二種方式實戰Spark Streaming開發
在這裡我主要借鑑課上老師講的以及官網的API來進行簡單的Spark Streaming的開發: 一:java形式: 1.我們可以總結一下步驟: 第一步:建立SparkConf物件 第二步:建立SparkStreamingContext 第三步:建立愛你
第102講: 動手實戰Spark Streaming自定義Receiver並進行除錯和測試
有興趣想學習國內整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq 471186150。共享視訊,價效比超高! 1:SparkStreaming雖然說已經支援了很多不同型別的資料來源。但是有時候可能我們的一些資料來源非
spark streaming 寫入db,hdfs
package main.java import java.sql.Connection import com.jolbox.bonecp.{BoneCP, BoneCPConfig} import org.slf4j.LoggerFactory /
下載基於大數據技術推薦系統實戰教程(Spark ML Spark Streaming Kafka Hadoop Mahout Flume Sqoop Redis)
大數據技術推薦系統 推薦系統實戰 地址:http://pan.baidu.com/s/1c2tOtwc 密碼:yn2r82課高清完整版,轉一播放碼。互聯網行業是大數據應用最前沿的陣地,目前主流的大數據技術,包括 hadoop,spark等,全部來自於一線互聯網公司。從應用角度講,大數據在互聯網領域主
PK2227-Spark Streaming實時流處理項目實戰
con ans filesize strip for 新年 感覺 post pre PK2227-Spark Streaming實時流處理項目實戰 新年伊始,學習要趁早,點滴記錄,學習就是進步! 隨筆背景:在很多時候,很多入門不久的朋友都會問我:我是從其他語言轉到程序
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記三之銘文升級版
聚集 配置文件 ssi path fig rect 擴展 str 控制臺 銘文一級: Flume概述Flume is a distributed, reliable, and available service for efficiently collecting(收集),
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記五之銘文升級版
環境變量 local server 節點數 replicas conn 配置環境 park 所有 銘文一級: 單節點單broker的部署及使用 $KAFKA_HOME/config/server.propertiesbroker.id=0listenershost.name
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記九之銘文升級版
file sin ssi 右上角 result map tap 核心 內容 銘文一級: 核心概念:StreamingContext def this(sparkContext: SparkContext, batchDuration: Duration) = { th
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記十之銘文升級版
state 分鐘 mooc 系統數據 使用 連接 var style stream 銘文一級: 第八章:Spark Streaming進階與案例實戰 updateStateByKey算子需求:統計到目前為止累積出現的單詞的個數(需要保持住以前的狀態) java.lang.I
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記十五之銘文升級版
spa for 序列 html art mat div pre paths 銘文一級:[木有筆記] 銘文二級: 第12章 Spark Streaming項目實戰 行為日誌分析: 1.訪問量的統計 2.網站黏性 3.推薦 Python實時產生數據 訪問URL->IP
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記十六之銘文升級版
.so zook orm 3.1 date nta highlight org 結果 銘文一級: linux crontab 網站:http://tool.lu/crontab 每一分鐘執行一次的crontab表達式: */1 * * * * crontab -e */1
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記十七之銘文升級版
eid 實時 root 現在 ava == oop urn 啟動 銘文一級: 功能1:今天到現在為止 實戰課程 的訪問量 yyyyMMdd courseid 使用數據庫來進行存儲我們的統計結果 Spark Streaming把統計結果寫入到數據庫裏面 可視化前端根據:yyy
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記二十之銘文升級版
.get frame 結果 取數據 lena echarts object 原理 四種 銘文一級: Spring Boot整合Echarts動態獲取HBase的數據1) 動態的傳遞進去當天的時間 a) 在代碼中寫死 b) 讓你查詢昨天的、前天的咋辦? 在頁面中放一個時間插
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記二十一之銘文升級版
win7 小時 其他 har safari 北京 web 連接 rim 銘文一級: DataV功能說明1)點擊量分省排名/運營商訪問占比 Spark SQL項目實戰課程: 通過IP就能解析到省份、城市、運營商 2)瀏覽器訪問占比/操作系統占比 Hadoop項目:userAg
基於Flume+Kafka+Spark Streaming打造實時流處理項目實戰課程
大數據本課程從實時數據產生和流向的各個環節出發,通過集成主流的分布式日誌收集框架Flume、分布式消息隊列Kafka、分布式列式數據庫HBase、及當前最火爆的Spark Streaming打造實時流處理項目實戰,讓你掌握實時處理的整套處理流程,達到大數據中級研發工程師的水平!下載地址:百度網盤下載
大資料學習之路106-spark streaming統計結果寫入mysql
我們首先將資料庫的配置資訊寫到配置檔案中。 要使用配置檔案的話,首先我們要在pom檔案中匯入配置檔案讀取依賴: <dependency> <groupId>com.typesafe</groupId>
Spark Streaming實時流處理專案實戰筆記
第二章 分散式日誌收集框架Flume 課程目錄 業務現狀分析=>flume概述=>flume架構及核心元件=>flume環境部署=>flume實戰 1、業務現狀分析 WebServer/ApplicationServer分散在各個機器上 大資
Spark Streaming實時流處理專案實戰筆記一
Spark Streaming實時流處理專案實戰筆記一 視訊資源下載:https://download.csdn.net/download/mys_mys/10778011 第一章:課程介紹 Hadoop環境:虛擬機器Centos6.4 Window:VMware 本地登入到
Spark Streaming實時流處理專案實戰 慕課知識點總結
一直比較推崇學習的時候帶著問題去思考 1 Spark transformation和action的區別 簡介: 1,transformation是得到一個新的RDD,方式很多,比如從資料來源生成一個新的RDD,從RDD生成一個新的RDD 2,action是得到一個值,或者一個結