sqprk叢集上使用自定義udf函式,出現無法序列化的錯誤
阿新 • • 發佈:2021-09-07
在spark叢集上,將讀取到的csv檔案生成的datafream,需要對其中一列進行轉化,內建的udf函式已經不能滿足需求
所以需要自定義一個udf,但是在使用的時候報錯,如下
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@7334dcbe) - field (class: algorithm.config.Base, name: sc, type: class org.apache.spark.SparkContext) - object (class algorithm.config.Base, algorithm.config.Base@4a6c33a0) - field (class: algorithm.config.Base$$anonfun$save_data_parquet$1, name: $outer, type: class algorithm.config.Base) - object (class algorithm.config.Base$$anonfun$save_data_parquet$1, <function1>) - field (class: algorithm.config.Base$$anonfun$save_data_parquet$1$$anonfun$2, name: $outer, type: class algorithm.config.Base$$anonfun$save_data_parquet$1) - object (class algorithm.config.Base$$anonfun$save_data_parquet$1$$anonfun$2, <function1>) - element of array (index: 25) - array (class [Ljava.lang.Object;, size 26)
百度了半天,最後總結如下:
當你執行各種轉換(map,flatMap,filter等等)的時候,會有以下轉換: 1、在driver節點上序列化, 2、上傳到合適的叢集中的節點, 3、在節點上執行反序列化, 4、最後在節點上執行。 自定義一個udf,並使用這個udf,那麼Spark知道不能序列化這個方法,於是試圖序列化整個類,因此才能使得這個方法能執行在其他JVM之上,正因為本例沒有序列化,所以才出現異常。
解決方法:
1. 在val sc = spark.sparkContext 上面加@transient
2. 類繼承序列化類