1. 程式人生 > 其它 >Hudi-StructuredStreaming流式寫入Hudi

Hudi-StructuredStreaming流式寫入Hudi

場景

實時產生的資料寫入到Kafka,由Spark實時讀取Kafka的資料,寫入hudi

實現

package com.zhen.hudi.streaming

import com.zhen.hudi.didi.SparkUtils
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}

/** * @Author FengZhen * @Date 3/3/22 10:16 PM * @Description 基於StructuredStreaming結構化流實時從kafka消費資料,經過ETL轉換後,儲存至Hudi表 */ object HudiStructureDemo { /** * 指定kafka topic名稱,實時消費資料 * @param spark * @param topicName * @return */ def readFromKafka(spark: SparkSession, topicName: String): DataFrame
= { spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", topicName) //消費位置 .option("startingOffsets", "latest") //每次最多處理十萬條 .option("maxOffsetsPerTrigger", 100000) //資料丟失是否失敗 .option("failOnDataLoss
", "false") .load() } /** * 對kafka獲取的資料進行轉換操作,獲取所有欄位的值,轉換為String,以便儲存到hudi表 * @param streamDF * @return */ def process(streamDF: DataFrame): DataFrame = { streamDF //選擇欄位 .selectExpr( "CAST(key AS STRING) AS order_id", "CAST(value AS STRING) AS message", "topic", "partition", "offset", "timestamp" ) //解析Message資料,提取欄位值 .withColumn("user_id", get_json_object(col("message"), "$.userId")) .withColumn("order_time", get_json_object(col("message"), "$.orderTime")) .withColumn("ip", get_json_object(col("message"), "$.ip")) .withColumn("order_money", get_json_object(col("message"), "$.orderMoney")) .withColumn("order_status", get_json_object(col("message"), "$.orderStatus")) //刪除message欄位 .drop(col("message")) //轉換訂單日期時間格式為Long型別,作為hudi表中合併資料欄位 .withColumn("ts", to_timestamp(col("order_time"), "yyyy-MM-dd HH:mm:ss.SSS")) //訂單日期時間提取分割槽日期:yyyy-MM-dd .withColumn("day", substring(col("order_time"), 0, 10)) } /** * 將流式資料DataFrame儲存到Hudi表中 * @param streamDF */ def saveToHudi(streamDF: DataFrame): Unit = { streamDF.writeStream .outputMode(OutputMode.Append()) .queryName("query-hudi-streaming") .foreachBatch((batchDF: Dataset[Row], batchId: Long) =>{ println(s"=============== BatchId: ${batchId} start =============== ") import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions._ batchDF.write .mode(SaveMode.Append) .format("hudi") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //hudi表的屬性值的設定 //主鍵 .option(RECORDKEY_FIELD.key(), "order_id") //預合併 .option(PRECOMBINE_FIELD.key(), "ts") //分割槽 .option(PARTITIONPATH_FIELD.key(), "day") //分割槽值對應目錄格式,與hive分割槽策略一致 .option(HIVE_STYLE_PARTITIONING_ENABLE.key(), "true") //表名 .option(TBL_NAME.key(), "tbl_hudi_order") //讀取時合併表型別 .option(TABLE_TYPE.key(), "MERGE_ON_READ") .save("/hudi-warehouse/tbl_hudi_order") }) .option("checkpointLocation", "/datas/hudi-spark/struct-ckpt-1001") .start() } def main(args: Array[String]): Unit = { //1.構建SparkSession例項物件 val spark: SparkSession = SparkUtils.createSparkSession(this.getClass) //2.從kafka實時消費資料 val kafkaStreamDF: DataFrame = readFromKafka(spark, "order-topic") //3.提取資料,轉換資料型別 val streamDF: DataFrame = process(kafkaStreamDF) //4.儲存資料至hudi表中:MOR型別,讀取表資料合併檔案 saveToHudi(streamDF) //5.流式應用啟動以後,等待終止 spark.streams.active.foreach(query => println(s"Query: ${query.name} is Running")) spark.streams.awaitAnyTermination() } }