1. 程式人生 > >如何在Spark中使用動態資料轉置

如何在Spark中使用動態資料轉置

Dynamic Transpose是Spark中的一個關鍵轉換,因為它需要大量的迭代。本文將為您提供有關如何使用記憶體中運算子處理此複雜方案的清晰概念。

首先,讓我們看看我們擁有的源資料: 

idoc_number,訂單ID,idoc_qualifier_org,idoc_org
7738,2364,6,0
7738,2364,7,0
7738,2364,8,mystr1
7738,2364,12,mystr2
7739,2365,12,mystr3
7739,2365,7,mystr4

我們還有idoc_qualifier_org 源資料記錄中列的查詢表  

由於查詢表的大小會更小,我們可以預期它會在快取中和驅動程式記憶體中。

預選賽,降序
6,司
7,分銷渠道
8,銷售組織
12,訂單型別

Dynamic Transpose操作的預期輸出是:

idoc_number,order_id,Division,Distribution Channel,Sales org,Order Type
7738,2364,0,0,mystr1,mystr2
7739,2365,空,mystr3,空,mystr4

以下程式碼實際上將根據資料中的當前列轉置資料。此程式碼是使用Spark中的Transpose Data的另一種方法。

此程式碼嚴格使用Spark的複雜資料型別,並且還負責迭代的效能。 

物件 DynamicTranspose {
 def  dataValidator(map_val:Seq [ Map [ String,String ]],rule:String):String  = {
  嘗試 {
   val  rule_array  =  規則。拆分(“#!”)。toList
   val  src_map  =  map_val。toList。壓扁。toMap
   var  output_str  =  “”
   rule_array。foreach(f  =>
    output_str  =  output_str  +  “!”  +  src_map。getOrElse(f,“#”)
   )

   return  output_str。掉落(1)
  } catch {
   案例 t:
    Throwable  =>  t。printStackTrace()。toString()
    返回 “0”。toString()
  }

 }

 def  main(args:Array [ String ]):Unit  = {

  val  spark  =  SparkSession。builder()。主人(“本地[*]”)。config(“spark.sql.warehouse.dir”,“<src dir>”)。getOrCreate()
  val  data_df  =  spark。讀。選項(“標題”,“真”)。csv(“<data path src>”)
  val  lkp_df  =  spark。讀。選項(“標題”,“真”)。csv(“查詢路徑源>”)
  進口 火花。暗示。_
  進口 組織。阿帕奇。火花。sql。功能。廣播

  val  lkp_df_brdcast  =  broadcast(lkp_df)
  val  result_df  =  data_df。加入(廣播(lkp_df_brdcast),$  “idoc_qualifier_org”  ===  $  “限定符”,“內部”)

  val  df1  =  result_df。groupBy(col(“idoc_number”),col(“orderid”))。agg(collect_list(map($  “desc”,$  “idoc_org”))as  “map”)
  進口 組織。阿帕奇。火花。sql。功能。UDF
  進口 組織。阿帕奇。火花。sql。功能。{
   點燃,
   最大,
   ROW_NUMBER
  }
  進口 火花。暗示。_
  進口 組織。阿帕奇。火花。sql。行
  val  map_val  =  lkp_df。rdd。地圖(行 =>  行。的getString(1))。收集()。mkString(“#!”)
  火花。sparkContext。廣播(map_val)
  VAL  recdValidator  =  UDF(dataValidator  _)
  var  latest_df  =  df1。withColumn(“explode_out”,split(recdValidator(df1(“map”),lit(map_val)),“!”))。掉落(“地圖”)
  val  columns  =  map_val。拆分(“#!”)。toList
  latest_df  =  列。zipWithIndex。foldLeft(latest_df){
   (memodDF,專欄)=> {
    memodDF。withColumn(柱。_1,山口(“explode_out” )(柱。_2))
   }
  }
  。drop(“explode_out”)
  latest_df。show()
 }

}

希望這可以幫助!