Hudi-通過Spark分析滴滴出行資料
阿新 • • 發佈:2022-03-02
工具類
package com.zhen.hudi.didi import org.apache.spark.sql.SparkSession /** * @Author FengZhen * @Date 3/1/22 9:34 PM * @Description SparkSql操作資料(載入讀取和儲存寫入)時工具類 * 比如獲取SparkSession例項物件等 */ object SparkUtils { /** * 構建SparkSession例項物件時,預設情況下本地模式執行 * @return */ def createSparkSession(clazz: Class[_], master: String= "local[4]", partitions: Int = 4): SparkSession = { SparkSession.builder() .appName(clazz.getSimpleName.stripSuffix("$")) .master(master) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.shuffle.partitions", partitions) .getOrCreate() } def main(args: Array[String]): Unit= { val spark = createSparkSession(this.getClass) println(spark) Thread.sleep(1000 * 100) spark.stop() } }
CSV檔案入hudi
package com.zhen.hudi.didi import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.functions._ /** * @Author FengZhen * @Date 3/1/22 9:29 PM * @Description 滴滴海口出行運營資料分析,使用sparkSQL操作資料,先讀取CSV,儲存至hudi表 * 1.構建SparkSession例項物件(整合Hudi和HDFS) * 2.載入本地CSV檔案格式弟弟出行資料 * 3.滴滴出行資料ETL處理 * 4.儲存轉換後資料至Hudi表 * 5.應用結束關閉資源*/ object DidiStorageSpark { val datasPath: String = "file:////Users/FengZhen/Desktop/accumulate/0_project/hudi-learning/datas/didi/dwv_order_make_haikou_1.txt" val hudiTableName: String = "tbl_didi_haikou" val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou" def main(args: Array[String]): Unit = { //1.構建SparkSession例項物件(整合Hudi和HDFS) val spark: SparkSession = SparkUtils.createSparkSession(this.getClass) //2.載入本地CSV檔案格式弟弟出行資料 val didiDF = readCsvFile(spark, datasPath) // didiDF.printSchema() // didiDF.show(10, false) //3.滴滴出行資料ETL處理 val etlDF: DataFrame = process(didiDF) // etlDF.printSchema() // etlDF.show(10, false) //4.儲存轉換後資料至Hudi表 saveToHudi(etlDF, hudiTableName, hudiTablePath) //5.應用結束關閉資源 spark.stop() } /** * 讀取CSV格式文字檔案資料,封裝到DataFrame中 * @param spark * @param path * @return */ def readCsvFile(spark: SparkSession, path: String): DataFrame = { spark.read //設定分隔符為製表符 .option("sep", "\\t") //檔案首行為列名稱 .option("header", "true") //依據數值自動推斷資料型別 .option("inferSchema", "true") //指定檔案路徑 .csv(path) } /** * 對滴滴出行海口資料進行ETL轉換操作:指定ts和partitionpath * @param dataFrame * @return */ def process(dataFrame: DataFrame): DataFrame = { dataFrame //新增欄位,就是hudi表分割槽欄位,三級分割槽 -> yyyy-MM-dd .withColumn( "partitionpath", concat_ws("-", col("year"), col("month"), col("day")) ) //刪除列 .drop("year", "month", "day") //新增timestamp列,作為hudi標記錄資料預合併欄位,使用發車時間 .withColumn( "ts", unix_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss") ) } /** * 將資料集DataFrame儲存至hudi表中,表的型別為COW,屬於批量儲存資料,寫少讀多 * @param dataFrame * @param table * @param path */ def saveToHudi(dataFrame: DataFrame, table: String, path: String) = { import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ //儲存資料 dataFrame.write .mode(SaveMode.Overwrite) .format("hudi") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //hudi表的屬性值的設定 //主鍵 .option(RECORDKEY_FIELD.key(), "order_id") //預合併 .option(PRECOMBINE_FIELD.key(), "ts") //分割槽 .option(PARTITIONPATH_FIELD.key(), "partitionpath") //表名 .option(TBL_NAME.key(), table) .save(path) } }
業務分析
package com.zhen.hudi.didi import java.util.{Calendar, Date} import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.functions._ /** * @Author FengZhen * @Date 3/1/22 9:31 PM * @Description 滴滴海口出行運營資料分析,使用sparkSQL操作資料,載入hudi表資料,按照業務需求統計 * Spark DataSource資料來源介面方式 * spark.read.format("hudi").. * dataframe.write.format("hudi") * * Spark2.x開始,程式入口SparkSession */ object DidiAnalysisSpark { //hudi表屬性,儲存資料HDFS路徑 val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou" def main(args: Array[String]): Unit = { //1.構建SparkSession例項物件(整合Hudi和HDFS) val spark:SparkSession = SparkUtils.createSparkSession(this.getClass, partitions = 8) //2.載入hudi表的資料,指定欄位 val hudiDF: DataFrame = readFromHudi(spark, hudiTablePath) hudiDF.printSchema() hudiDF.show(10, false) //由於資料被使用多次,所以建議快取 hudiDF.persist(StorageLevel.MEMORY_AND_DISK) //3.按照業務指標進行統計分析 //指標1:訂單型別(product_id)統計 //reportProduct(hudiDF) //指標2:訂單時效統計 //reportType(hudiDF) //指標3:交通型別統計 //reportTraffic(hudiDF) //指標4:訂單價格統計 //reportPrice(hudiDF) //指標5:訂單距離統計 //reportDistance(hudiDF) //指標6:日期型別 -> 星期,進行統計 reportWeek(hudiDF) //當資料不再使用時,釋放資源 hudiDF.unpersist() //4.應用結束,關閉資源 spark.stop() } /** * 載入hudi表資料,封裝到dataframe中 * @param spark * @param path * @return */ def readFromHudi(spark: SparkSession, path: String): DataFrame = { val didiDF: DataFrame = spark.read.format("hudi").load(path) //選擇欄位 didiDF.select( "product_id", "type", "traffic_type", "pre_total_fee", "start_dest_distance", "departure_time" ) } /** * 訂單型別統計,欄位:product_id * @param dataFrame */ def reportProduct(dataFrame: DataFrame): Unit = { //1滴滴專車,2滴滴企業專車,3滴滴快車,4滴滴企業快車 //a.按照產品線ID分組統計即可 val reportDF: DataFrame = dataFrame.groupBy("product_id").count() //b.自定義UDF函式 val to_name = udf( (productId: Int) => { productId match { case 1 => "滴滴專車" case 2 => "滴滴企業專車" case 3 => "滴滴快車" case 4 => "滴滴企業快車" } } ) //c.轉換名稱 val resultDF: DataFrame = reportDF.select( to_name(col("product_id")).as("order_type"), col("count").as("total") ) resultDF.printSchema() resultDF.show(10, false) } /** * 訂單時效性統計,欄位:type * @param dataFrame */ def reportType(dataFrame: DataFrame): Unit = { //a.按照時效性id分組統計即可 val reportDF: DataFrame = dataFrame.groupBy("type").count() //b.自定義UDF函式 val to_name = udf( (realtimeType: Int) => { realtimeType match { case 0 => "實時" case 1 => "預約" } } ) //c.轉換名稱 val resultDF: DataFrame = reportDF.select( to_name(col("type")).as("order_realtime"), col("count").as("total") ) resultDF.printSchema() resultDF.show(10, false) } /** * 根據交通型別統計,欄位:traffic_type * @param dataFrame */ def reportTraffic(dataFrame: DataFrame): Unit = { //a.按照交通型別分組統計即可 val reportDF: DataFrame = dataFrame.groupBy("traffic_type").count() //b.自定義UDF函式 val to_name = udf( (realtimeType: Int) => { realtimeType match { case 0 => "普通散客" case 1 => "企業時租" case 2 => "企業接機套餐" case 3 => "企業送機套餐" case 4 => "拼車" case 5 => "接機" case 6 => "送機" case 302 => "跨域拼車" case _ => "未知" } } ) //c.轉換名稱 val resultDF: DataFrame = reportDF.select( to_name(col("traffic_type")).as("traffic_type"), col("count").as("total") ) resultDF.printSchema() resultDF.show(10, false) } /** * 訂單價格統計,先將訂單價格劃分階段,再統計各個階段的數目,欄位:pre_total_fee * @param dataFrame * @return */ def reportPrice(dataFrame: DataFrame): Unit = { val resultDF: DataFrame = dataFrame .agg( //價格0-15 sum( when(col("pre_total_fee").between(0, 15), 1).otherwise(0) ).as("0-15"), //價格16-30 sum( when(col("pre_total_fee").between(16, 30), 1).otherwise(0) ).as("16-30"), //價格31-50 sum( when(col("pre_total_fee").between(31, 50), 1).otherwise(0) ).as("31-50"), //價格51-100 sum( when(col("pre_total_fee").between(51, 100), 1).otherwise(0) ).as("51-100"), //價格100+ sum( when(col("pre_total_fee").gt(100), 1).otherwise(0) ).as("100+"), ) resultDF.printSchema() resultDF.show(10, false) } /** * 訂單距離統計,先將訂單距離劃分階段,再統計各個階段的數目,欄位:start_dest_distance * @param dataFrame * @return */ def reportDistance(dataFrame: DataFrame): Unit = { val resultDF: DataFrame = dataFrame .agg( //距離0-10km sum( when(col("start_dest_distance").between(0, 10000), 1).otherwise(0) ).as("0-10km"), //距離10-20km sum( when(col("start_dest_distance").between(10001, 20000), 1).otherwise(0) ).as("10-20km"), //距離20-30 sum( when(col("start_dest_distance").between(20001, 30000), 1).otherwise(0) ).as("20-30km"), //距離30-50 sum( when(col("pre_total_fee").between(30001, 50001), 1).otherwise(0) ).as("30-50km"), //距離50+ sum( when(col("pre_total_fee").gt(500001), 1).otherwise(0) ).as("50km+"), ) resultDF.printSchema() resultDF.show(10, false) } /** * 交易完成時間轉換為星期,根據星期統計,欄位:departure_time * @param dataFrame */ def reportWeek(dataFrame: DataFrame): Unit = { //b.自定義UDF函式 val to_week = udf( (departureTime: String) => { val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd") val calendar: Calendar = Calendar.getInstance() val date: Date = format.parse(departureTime) calendar.setTime(date) val dayWeek = calendar.get(Calendar.DAY_OF_WEEK) match{ case 1 => "星期日" case 2 => "星期一" case 3 => "星期二" case 4 => "星期三" case 5 => "星期四" case 6 => "星期五" case 7 => "星期六" } dayWeek } ) val resultDF: DataFrame = dataFrame .select( to_week(col("departure_time")).as("week") ) .groupBy(col("week")).count() .select( col("week"), col("count").as("total") ) .orderBy(col("total").desc) resultDF.printSchema() resultDF.show(10, false) } }