1. 程式人生 > 其它 >sqprk叢集上使用自定義udf函式,出現無法序列化的錯誤

sqprk叢集上使用自定義udf函式,出現無法序列化的錯誤

在spark叢集上,將讀取到的csv檔案生成的datafream,需要對其中一列進行轉化,內建的udf函式已經不能滿足需求

所以需要自定義一個udf,但是在使用的時候報錯,如下

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:
393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$
1.apply(RDD.scala:849) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:
849) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@7334dcbe)
    - field (class: algorithm.config.Base, name: sc, type: class org.apache.spark.SparkContext)
    - object (class algorithm.config.Base, algorithm.config.Base@4a6c33a0)
    - field (class: algorithm.config.Base$$anonfun$save_data_parquet$1, name: $outer, type: class algorithm.config.Base)
    - object (class algorithm.config.Base$$anonfun$save_data_parquet$1, <function1>)
    - field (class: algorithm.config.Base$$anonfun$save_data_parquet$1$$anonfun$2, name: $outer, type: class algorithm.config.Base$$anonfun$save_data_parquet$1)
    - object (class algorithm.config.Base$$anonfun$save_data_parquet$1$$anonfun$2, <function1>)
    - element of array (index: 25)
    - array (class [Ljava.lang.Object;, size 26)

百度了半天,最後總結如下:

當你執行各種轉換(map,flatMap,filter等等)的時候,會有以下轉換:
1、在driver節點上序列化,
2、上傳到合適的叢集中的節點,
3、在節點上執行反序列化,
4、最後在節點上執行。

自定義一個udf,並使用這個udf,那麼Spark知道不能序列化這個方法,於是試圖序列化整個類,因此才能使得這個方法能執行在其他JVM之上,正因為本例沒有序列化,所以才出現異常。

解決方法:

1. 在val sc = spark.sparkContext 上面加@transient

2. 類繼承序列化類