1. 程式人生 > 其它 >Hudi-通過Spark分析滴滴出行資料

Hudi-通過Spark分析滴滴出行資料

工具類

package com.zhen.hudi.didi

import org.apache.spark.sql.SparkSession

/**
  * @Author FengZhen
  * @Date 3/1/22 9:34 PM 
  * @Description SparkSql操作資料(載入讀取和儲存寫入)時工具類
  *             比如獲取SparkSession例項物件等
  */
object SparkUtils {

  /**
    * 構建SparkSession例項物件時,預設情況下本地模式執行
    * @return
    */
  def createSparkSession(clazz: Class[_], master: String 
= "local[4]", partitions: Int = 4): SparkSession = { SparkSession.builder() .appName(clazz.getSimpleName.stripSuffix("$")) .master(master) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.shuffle.partitions", partitions) .getOrCreate() } def main(args: Array[String]): Unit
= { val spark = createSparkSession(this.getClass) println(spark) Thread.sleep(1000 * 100) spark.stop() } }

CSV檔案入hudi

package com.zhen.hudi.didi

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._

/**
  * @Author FengZhen
  * @Date 3/1/22 9:29 PM 
  * @Description 滴滴海口出行運營資料分析,使用sparkSQL操作資料,先讀取CSV,儲存至hudi表
  *   1.構建SparkSession例項物件(整合Hudi和HDFS)
  *   2.載入本地CSV檔案格式弟弟出行資料
  *   3.滴滴出行資料ETL處理
  *   4.儲存轉換後資料至Hudi表
  *   5.應用結束關閉資源
  
*/ object DidiStorageSpark { val datasPath: String = "file:////Users/FengZhen/Desktop/accumulate/0_project/hudi-learning/datas/didi/dwv_order_make_haikou_1.txt" val hudiTableName: String = "tbl_didi_haikou" val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou" def main(args: Array[String]): Unit = { //1.構建SparkSession例項物件(整合Hudi和HDFS) val spark: SparkSession = SparkUtils.createSparkSession(this.getClass) //2.載入本地CSV檔案格式弟弟出行資料 val didiDF = readCsvFile(spark, datasPath) // didiDF.printSchema() // didiDF.show(10, false) //3.滴滴出行資料ETL處理 val etlDF: DataFrame = process(didiDF) // etlDF.printSchema() // etlDF.show(10, false) //4.儲存轉換後資料至Hudi表 saveToHudi(etlDF, hudiTableName, hudiTablePath) //5.應用結束關閉資源 spark.stop() } /** * 讀取CSV格式文字檔案資料,封裝到DataFrame中 * @param spark * @param path * @return */ def readCsvFile(spark: SparkSession, path: String): DataFrame = { spark.read //設定分隔符為製表符 .option("sep", "\\t") //檔案首行為列名稱 .option("header", "true") //依據數值自動推斷資料型別 .option("inferSchema", "true") //指定檔案路徑 .csv(path) } /** * 對滴滴出行海口資料進行ETL轉換操作:指定ts和partitionpath * @param dataFrame * @return */ def process(dataFrame: DataFrame): DataFrame = { dataFrame //新增欄位,就是hudi表分割槽欄位,三級分割槽 -> yyyy-MM-dd .withColumn( "partitionpath", concat_ws("-", col("year"), col("month"), col("day")) ) //刪除列 .drop("year", "month", "day") //新增timestamp列,作為hudi標記錄資料預合併欄位,使用發車時間 .withColumn( "ts", unix_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss") ) } /** * 將資料集DataFrame儲存至hudi表中,表的型別為COW,屬於批量儲存資料,寫少讀多 * @param dataFrame * @param table * @param path */ def saveToHudi(dataFrame: DataFrame, table: String, path: String) = { import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ //儲存資料 dataFrame.write .mode(SaveMode.Overwrite) .format("hudi") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //hudi表的屬性值的設定 //主鍵 .option(RECORDKEY_FIELD.key(), "order_id") //預合併 .option(PRECOMBINE_FIELD.key(), "ts") //分割槽 .option(PARTITIONPATH_FIELD.key(), "partitionpath") //表名 .option(TBL_NAME.key(), table) .save(path) } }

業務分析

package com.zhen.hudi.didi

import java.util.{Calendar, Date}

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._


/**
  * @Author FengZhen
  * @Date 3/1/22 9:31 PM 
  * @Description 滴滴海口出行運營資料分析,使用sparkSQL操作資料,載入hudi表資料,按照業務需求統計
  * Spark DataSource資料來源介面方式
  * spark.read.format("hudi")..
  * dataframe.write.format("hudi")
  *
  * Spark2.x開始,程式入口SparkSession
  */
object DidiAnalysisSpark {

  //hudi表屬性,儲存資料HDFS路徑
  val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou"

  def main(args: Array[String]): Unit = {

    //1.構建SparkSession例項物件(整合Hudi和HDFS)
    val spark:SparkSession = SparkUtils.createSparkSession(this.getClass, partitions = 8)

    //2.載入hudi表的資料,指定欄位
    val hudiDF: DataFrame = readFromHudi(spark, hudiTablePath)
    hudiDF.printSchema()
    hudiDF.show(10, false)

    //由於資料被使用多次,所以建議快取
    hudiDF.persist(StorageLevel.MEMORY_AND_DISK)

    //3.按照業務指標進行統計分析
    //指標1:訂單型別(product_id)統計
    //reportProduct(hudiDF)

    //指標2:訂單時效統計
    //reportType(hudiDF)

    //指標3:交通型別統計
    //reportTraffic(hudiDF)

    //指標4:訂單價格統計
    //reportPrice(hudiDF)

    //指標5:訂單距離統計
    //reportDistance(hudiDF)

    //指標6:日期型別 -> 星期,進行統計
    reportWeek(hudiDF)

    //當資料不再使用時,釋放資源
    hudiDF.unpersist()

    //4.應用結束,關閉資源
    spark.stop()
  }

  /**
    * 載入hudi表資料,封裝到dataframe中
    * @param spark
    * @param path
    * @return
    */
  def readFromHudi(spark: SparkSession, path: String): DataFrame = {

    val didiDF: DataFrame = spark.read.format("hudi").load(path)

    //選擇欄位
    didiDF.select(
      "product_id", "type", "traffic_type", "pre_total_fee", "start_dest_distance", "departure_time"
    )

  }

  /**
    * 訂單型別統計,欄位:product_id
    * @param dataFrame
    */
  def reportProduct(dataFrame: DataFrame): Unit = {

    //1滴滴專車,2滴滴企業專車,3滴滴快車,4滴滴企業快車
    //a.按照產品線ID分組統計即可
    val reportDF: DataFrame = dataFrame.groupBy("product_id").count()

    //b.自定義UDF函式
    val to_name = udf(
      (productId: Int) => {
        productId match {
          case 1 => "滴滴專車"
          case 2 => "滴滴企業專車"
          case 3 => "滴滴快車"
          case 4 => "滴滴企業快車"
        }
      }
    )

    //c.轉換名稱
    val resultDF: DataFrame = reportDF.select(
      to_name(col("product_id")).as("order_type"),
      col("count").as("total")
    )
    resultDF.printSchema()
    resultDF.show(10, false)

  }

  /**
    * 訂單時效性統計,欄位:type
    * @param dataFrame
    */
  def reportType(dataFrame: DataFrame): Unit = {

    //a.按照時效性id分組統計即可
    val reportDF: DataFrame = dataFrame.groupBy("type").count()

    //b.自定義UDF函式
    val to_name = udf(
      (realtimeType: Int) => {
        realtimeType match {
          case 0 => "實時"
          case 1 => "預約"
        }
      }
    )

    //c.轉換名稱
    val resultDF: DataFrame = reportDF.select(
      to_name(col("type")).as("order_realtime"),
      col("count").as("total")
    )
    resultDF.printSchema()
    resultDF.show(10, false)

  }

  /**
    * 根據交通型別統計,欄位:traffic_type
    * @param dataFrame
    */
  def reportTraffic(dataFrame: DataFrame): Unit = {
    //a.按照交通型別分組統計即可
    val reportDF: DataFrame = dataFrame.groupBy("traffic_type").count()

    //b.自定義UDF函式
    val to_name = udf(
      (realtimeType: Int) => {
        realtimeType match {
          case 0 => "普通散客"
          case 1 => "企業時租"
          case 2 => "企業接機套餐"
          case 3 => "企業送機套餐"
          case 4 => "拼車"
          case 5 => "接機"
          case 6 => "送機"
          case 302 => "跨域拼車"
          case _ => "未知"
        }
      }
    )

    //c.轉換名稱
    val resultDF: DataFrame = reportDF.select(
      to_name(col("traffic_type")).as("traffic_type"),
      col("count").as("total")
    )
    resultDF.printSchema()
    resultDF.show(10, false)

  }

  /**
    * 訂單價格統計,先將訂單價格劃分階段,再統計各個階段的數目,欄位:pre_total_fee
    * @param dataFrame
    * @return
    */
  def reportPrice(dataFrame: DataFrame): Unit = {

    val resultDF: DataFrame = dataFrame
      .agg(
        //價格0-15
        sum(
          when(col("pre_total_fee").between(0, 15), 1).otherwise(0)
        ).as("0-15"),
        //價格16-30
        sum(
          when(col("pre_total_fee").between(16, 30), 1).otherwise(0)
        ).as("16-30"),
        //價格31-50
        sum(
          when(col("pre_total_fee").between(31, 50), 1).otherwise(0)
        ).as("31-50"),
        //價格51-100
        sum(
          when(col("pre_total_fee").between(51, 100), 1).otherwise(0)
        ).as("51-100"),
        //價格100+
        sum(
          when(col("pre_total_fee").gt(100), 1).otherwise(0)
        ).as("100+"),
      )

    resultDF.printSchema()
    resultDF.show(10, false)

  }

  /**
    * 訂單距離統計,先將訂單距離劃分階段,再統計各個階段的數目,欄位:start_dest_distance
    * @param dataFrame
    * @return
    */
  def reportDistance(dataFrame: DataFrame): Unit = {

    val resultDF: DataFrame = dataFrame
      .agg(
        //距離0-10km
        sum(
          when(col("start_dest_distance").between(0, 10000), 1).otherwise(0)
        ).as("0-10km"),
        //距離10-20km
        sum(
          when(col("start_dest_distance").between(10001, 20000), 1).otherwise(0)
        ).as("10-20km"),
        //距離20-30
        sum(
          when(col("start_dest_distance").between(20001, 30000), 1).otherwise(0)
        ).as("20-30km"),
        //距離30-50
        sum(
          when(col("pre_total_fee").between(30001, 50001), 1).otherwise(0)
        ).as("30-50km"),
        //距離50+
        sum(
          when(col("pre_total_fee").gt(500001), 1).otherwise(0)
        ).as("50km+"),
      )

    resultDF.printSchema()
    resultDF.show(10, false)

  }


  /**
    * 交易完成時間轉換為星期,根據星期統計,欄位:departure_time
    * @param dataFrame
    */
  def reportWeek(dataFrame: DataFrame): Unit = {

    //b.自定義UDF函式
    val to_week = udf(
      (departureTime: String) => {
        val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd")
        val calendar: Calendar = Calendar.getInstance()

        val date: Date = format.parse(departureTime)
        calendar.setTime(date)

        val dayWeek = calendar.get(Calendar.DAY_OF_WEEK) match{
          case 1 => "星期日"
          case 2 => "星期一"
          case 3 => "星期二"
          case 4 => "星期三"
          case 5 => "星期四"
          case 6 => "星期五"
          case 7 => "星期六"
        }
        dayWeek
      }
    )

    val resultDF: DataFrame = dataFrame
      .select(
        to_week(col("departure_time")).as("week")
      )
      .groupBy(col("week")).count()
      .select(
        col("week"),
        col("count").as("total")
      )
      .orderBy(col("total").desc)

    resultDF.printSchema()
    resultDF.show(10, false)

  }



}