spark序列化問題解決
最近公司在做一個電商推薦系統專案,其中涉及到一個評分轉換功能,就是將使用者在電商網站的行為轉換為對應的評分資料,然後使用spark mllib中提供的方法使用
在做評分轉換的過程中,遇到的序列化問題,今天就好好整理了一下spark中序列化問題.
spark版本:2.1.0 配置如下:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId> <version>${spark.version}</version></dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>${spark.version}</version> </dependency>
下面就從程式碼最low的版本說起
import java.io.FileInputStream import java.util.Properties import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.rdd.RDD importorg.apache.spark.sql.{SparkSession} /** * Created by xiaoxiao_zhang on 2017/5/9. * 評分轉換: * 將使用者行為轉化為對應的評分(瀏覽,收藏,加購物車,購買) * */ /** * Created by xiaoxiao_zhang on 2017/5/11. */ class ScoreConvert1(sc:SparkContext) extends Serializable{ // def covert2Score(sourceRdd: RDD[String]) = { def covert2Score() = { val sourceRdd: RDD[String] = sc.textFile("data/tianchi_mobile_recommend_train_user.csv") val scoreRdd = sourceRdd.map(t => { val splits = t.split(",") if (splits.length >= 3) { val actionScore = splits(2) match { case "1" => 1 case "2" => 3 case "3" => 6 case "4" => 8 case _ => 0.0 } Array(splits(0), splits(1), actionScore).mkString(",") } else { null } }) scoreRdd } def parseRating(str: String): Rating1 = { val fields = str.split(",") Rating1(fields(0).toInt, fields(1).toInt, fields(2).toFloat) } def fillWithScore(scores: RDD[String], spark: SparkSession) { import spark.implicits._ val ratings = scores .map(parseRating) .toDF() val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2)) val als = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("userId") .setItemCol("itemId") .setRatingCol("rating") val model: ALSModel = als.fit(training) model.itemFactors.show() println("-----1111-------") val userFactors = model.userFactors val itemFactors = model.itemFactors userFactors.show(truncate = false) println("------2222-------") model.itemFactors.show(truncate = false) val joinedDF = userFactors.join(itemFactors,Seq("features"),"inner") println("------3333-------") joinedDF.show(10) } } object ScoreConvert1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("ScoreConvert").setMaster("local[5]") val sc = new SparkContext(sparkConf) sc.setLogLevel("ERROR") val props = new Properties() props.load(new FileInputStream(Constant.profileName)) val spark = SparkSession.builder().getOrCreate() val sourceRdd: RDD[String] = sc.textFile("data/tianchi_mobile_recommend_train_user.csv") val convert: ScoreConvert1 = new ScoreConvert1(sc) val scores: RDD[String] = convert.covert2Score() println("scores-----" + scores.take(3).toList) convert.fillWithScore(scores, spark) } } case class Rating1(userId: Int, itemId: Int, rating: Float)
執行之後,直接報如下錯誤:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at com.allyes.awise.eng.score.ScoreConvert1.fillWithScore(ScoreConvert1.scala:53) at com.allyes.awise.eng.score.ScoreConvert1$.main(ScoreConvert1.scala:97) at com.allyes.awise.eng.score.ScoreConvert1.main(ScoreConvert1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: [email protected]63649129) - field (class: com.allyes.awise.eng.score.ScoreConvert1, name: sc, type: class org.apache.spark.SparkContext) - object (class com.allyes.awise.eng.score.ScoreConvert1, [email protected]2423696d) - field (class: com.allyes.awise.eng.score.ScoreConvert1$$anonfun$2, name: $outer, type: class com.allyes.awise.eng.score.ScoreConvert1) - object (class com.allyes.awise.eng.score.ScoreConvert1$$anonfun$2, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 17 more Process finished with exit code 1
報錯很明顯,是sparkContext沒有實現序列化, 而sparkContext是spark程式的執行入口,是不需要例項化的, 看一下報錯的那一行
def fillWithScore(scores: RDD[String], spark: SparkSession) { import spark.implicits._ val ratings = scores .map(parseRating) .toDF()在rdd中map之後,呼叫的是parseRating 這個方法,
而這個方法只是將字串切分,然後封裝到case class中,感覺沒什麼問題,
和同事交流以及檢視網上的一些資料之後發現,在executor端執行的程式如果需要使用外部物件,那麼外部物件必須序列化,在這裡, parseRating
方法是在類 ScoreConvert1.scala中的方法,而 fillWithScore 也是ScoreConvert1.scala中的成員方法,
在executor呼叫parseRating方法時候必須保證方法所在的類中的成員變數,成員方法,外部變數都實現序列化,也就是說這裡雖然ScoreConvert1這個類實現了序列化,但是其中的外部變數sparkContext沒有實現序列化,所以會報錯 !!!
既然找到了錯誤原因,那就改進吧,
方法一: 將parseRating方法中所做的事情,放到map中,這樣的話,就不需要ScoreConvert1.scala實現序列化
方法二: 使用 parseRating這個函式,這時候就需要將 ScoreConvert1.scala 中的成員變數,方法,外部變數全部都實現序列化,但是這時候可以把讀取資料的那部分操作放到方法的外部,然後將rdd作為引數傳到方法中,這時候,ScoreConvert1.scala 必須實現序列化,變化如下標記
另外,這時候,sc這個引數傳到 ScoreConvert1這個類中已經沒意義了,因為在這個類中,沒有使用sc的
改進如下:
import java.io.FileInputStream import java.util.Properties import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SparkSession} /** * Created by xiaoxiao_zhang on 2017/5/9. * 評分轉換: * 將使用者行為轉化為對應的評分(瀏覽,收藏,加購物車,購買) * */ /** * Created by xiaoxiao_zhang on 2017/5/11. */ class ScoreConvert1 extends Serializable { def covert2Score(sourceRdd: RDD[String]) = { val scoreRdd = sourceRdd.map(t => { val splits = t.split(",") if (splits.length >= 3) { val actionScore = splits(2) match { case "1" => 1 case "2" => 3 case "3" => 6 case "4" => 8 case _ => 0.0 } Array(splits(0), splits(1), actionScore).mkString(",") } else { null } }) scoreRdd } def parseRating(str: String): Rating1 = { val fields = str.split(",") Rating1(fields(0).toInt, fields(1).toInt, fields(2).toFloat) } def fillWithScore(scores: RDD[String], spark: SparkSession) { import spark.implicits._ val ratings = scores .map(parseRating) .toDF() val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2)) val als = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("userId") .setItemCol("itemId") .setRatingCol("rating") val model: ALSModel = als.fit(training) model.itemFactors.show() println("-----1111-------") val userFactors = model.userFactors val itemFactors = model.itemFactors userFactors.show(truncate = false) println("------2222-------") model.itemFactors.show(truncate = false) val joinedDF = userFactors.join(itemFactors,Seq("features"),"inner") println("------3333-------") joinedDF.show(10) } } object ScoreConvert1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("ScoreConvert").setMaster("local[5]") val sc = new SparkContext(sparkConf) sc.setLogLevel("ERROR") val props = new Properties() props.load(new FileInputStream(Constant.profileName)) val spark = SparkSession.builder().getOrCreate() val sourceRdd: RDD[String] = sc.textFile("data/tianchi_mobile_recommend_train_user.csv") val convert: ScoreConvert1 = new ScoreConvert1() val scores: RDD[String] = convert.covert2Score(sourceRdd) println("scores-----" + scores.take(3).toList) convert.fillWithScore(scores, spark) } } case class Rating1(userId: Int, itemId: Int, rating: Float)
總結: 只要在.map,.filter這樣的rdd內部想使用外部變數的話必須實現序列化,這裡外部變數可能是函式所在類的成員變數,類的成員函式,類的建構函式中的其他外部變數
不然的話要麼就將函式所做的事情放到map這樣的內部做,這樣就不需要物件實現序列化至此,困擾2天的序列化問題已經解決
記於 2017-05-11 推薦系統專案