如何在Spark中使用動態資料轉置
阿新 • • 發佈:2018-12-18
Dynamic Transpose是Spark中的一個關鍵轉換,因為它需要大量的迭代。本文將為您提供有關如何使用記憶體中運算子處理此複雜方案的清晰概念。
首先,讓我們看看我們擁有的源資料:
idoc_number,訂單ID,idoc_qualifier_org,idoc_org
7738,2364,6,0
7738,2364,7,0
7738,2364,8,mystr1
7738,2364,12,mystr2
7739,2365,12,mystr3
7739,2365,7,mystr4
我們還有idoc_qualifier_org
源資料記錄中列的查詢表 。由於查詢表的大小會更小,我們可以預期它會在快取中和驅動程式記憶體中。
預選賽,降序
6,司
7,分銷渠道
8,銷售組織
12,訂單型別
Dynamic Transpose操作的預期輸出是:
idoc_number,order_id,Division,Distribution Channel,Sales org,Order Type
7738,2364,0,0,mystr1,mystr2
7739,2365,空,mystr3,空,mystr4
以下程式碼實際上將根據資料中的當前列轉置資料。此程式碼是使用Spark中的Transpose Data的另一種方法。 此程式碼嚴格使用Spark的複雜資料型別,並且還負責迭代的效能。
物件 DynamicTranspose {
def dataValidator(map_val:Seq [ Map [ String,String ]],rule:String):String = {
嘗試 {
val rule_array = 規則。拆分(“#!”)。toList
val src_map = map_val。toList。壓扁。toMap
var output_str = “”
rule_array。foreach(f =>
output_str = output_str + “!” + src_map。getOrElse(f,“#”)
)
return output_str。掉落(1)
} catch {
案例 t:
Throwable => t。printStackTrace()。toString()
返回 “0”。toString()
}
}
def main(args:Array [ String ]):Unit = {
val spark = SparkSession。builder()。主人(“本地[*]”)。config(“spark.sql.warehouse.dir”,“<src dir>”)。getOrCreate()
val data_df = spark。讀。選項(“標題”,“真”)。csv(“<data path src>”)
val lkp_df = spark。讀。選項(“標題”,“真”)。csv(“查詢路徑源>”)
進口 火花。暗示。_
進口 組織。阿帕奇。火花。sql。功能。廣播
val lkp_df_brdcast = broadcast(lkp_df)
val result_df = data_df。加入(廣播(lkp_df_brdcast),$ “idoc_qualifier_org” === $ “限定符”,“內部”)
val df1 = result_df。groupBy(col(“idoc_number”),col(“orderid”))。agg(collect_list(map($ “desc”,$ “idoc_org”))as “map”)
進口 組織。阿帕奇。火花。sql。功能。UDF
進口 組織。阿帕奇。火花。sql。功能。{
點燃,
最大,
ROW_NUMBER
}
進口 火花。暗示。_
進口 組織。阿帕奇。火花。sql。行
val map_val = lkp_df。rdd。地圖(行 => 行。的getString(1))。收集()。mkString(“#!”)
火花。sparkContext。廣播(map_val)
VAL recdValidator = UDF(dataValidator _)
var latest_df = df1。withColumn(“explode_out”,split(recdValidator(df1(“map”),lit(map_val)),“!”))。掉落(“地圖”)
val columns = map_val。拆分(“#!”)。toList
latest_df = 列。zipWithIndex。foldLeft(latest_df){
(memodDF,專欄)=> {
memodDF。withColumn(柱。_1,山口(“explode_out” )(柱。_2))
}
}
。drop(“explode_out”)
latest_df。show()
}
}
希望這可以幫助!