1. 程式人生 > >spark序列化問題解決

spark序列化問題解決

最近公司在做一個電商推薦系統專案,其中涉及到一個評分轉換功能,就是將使用者在電商網站的行為轉換為對應的評分資料,然後使用spark mllib中提供的方法使用

在做評分轉換的過程中,遇到的序列化問題,今天就好好整理了一下spark中序列化問題.

spark版本:2.1.0 配置如下:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.10</artifactId>
    <version>${spark.version}</version>
</dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>
spark-sql_2.10</artifactId> <version>${spark.version}</version> </dependency>

下面就從程式碼最low的版本說起

import java.io.FileInputStream
import java.util.Properties

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.rdd.RDD
import 
org.apache.spark.sql.{SparkSession} /** * Created by xiaoxiao_zhang on 2017/5/9. * 評分轉換: * 將使用者行為轉化為對應的評分(瀏覽,收藏,加購物車,購買) * */ /** * Created by xiaoxiao_zhang on 2017/5/11. */ class ScoreConvert1(sc:SparkContext) extends Serializable{ // def covert2Score(sourceRdd: RDD[String]) = { def covert2Score() = { val sourceRdd: RDD[String] = sc.textFile("data/tianchi_mobile_recommend_train_user.csv") val scoreRdd = sourceRdd.map(t => { val splits = t.split(",") if (splits.length >= 3) { val actionScore = splits(2) match { case "1" => 1 case "2" => 3 case "3" => 6 case "4" => 8 case _ => 0.0 } Array(splits(0), splits(1), actionScore).mkString(",") } else { null } }) scoreRdd } def parseRating(str: String): Rating1 = { val fields = str.split(",") Rating1(fields(0).toInt, fields(1).toInt, fields(2).toFloat) } def fillWithScore(scores: RDD[String], spark: SparkSession) { import spark.implicits._ val ratings = scores .map(parseRating) .toDF() val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2)) val als = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("userId") .setItemCol("itemId") .setRatingCol("rating") val model: ALSModel = als.fit(training) model.itemFactors.show() println("-----1111-------") val userFactors = model.userFactors val itemFactors = model.itemFactors userFactors.show(truncate = false) println("------2222-------") model.itemFactors.show(truncate = false) val joinedDF = userFactors.join(itemFactors,Seq("features"),"inner") println("------3333-------") joinedDF.show(10) } } object ScoreConvert1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("ScoreConvert").setMaster("local[5]") val sc = new SparkContext(sparkConf) sc.setLogLevel("ERROR") val props = new Properties() props.load(new FileInputStream(Constant.profileName)) val spark = SparkSession.builder().getOrCreate() val sourceRdd: RDD[String] = sc.textFile("data/tianchi_mobile_recommend_train_user.csv") val convert: ScoreConvert1 = new ScoreConvert1(sc) val scores: RDD[String] = convert.covert2Score() println("scores-----" + scores.take(3).toList) convert.fillWithScore(scores, spark) } } case class Rating1(userId: Int, itemId: Int, rating: Float)

執行之後,直接報如下錯誤:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at com.allyes.awise.eng.score.ScoreConvert1.fillWithScore(ScoreConvert1.scala:53)
at com.allyes.awise.eng.score.ScoreConvert1$.main(ScoreConvert1.scala:97)
at com.allyes.awise.eng.score.ScoreConvert1.main(ScoreConvert1.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: [email protected]63649129)
- field (class: com.allyes.awise.eng.score.ScoreConvert1, name: sc, type: class org.apache.spark.SparkContext)
- object (class com.allyes.awise.eng.score.ScoreConvert1, [email protected]2423696d)
- field (class: com.allyes.awise.eng.score.ScoreConvert1$$anonfun$2, name: $outer, type: class com.allyes.awise.eng.score.ScoreConvert1)
- object (class com.allyes.awise.eng.score.ScoreConvert1$$anonfun$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 17 more

Process finished with exit code 1

報錯很明顯,是sparkContext沒有實現序列化, 而sparkContext是spark程式的執行入口,是不需要例項化的, 看一下報錯的那一行

def fillWithScore(scores: RDD[String], spark: SparkSession) {
  import spark.implicits._
  val ratings = scores
    .map(parseRating)
    .toDF()
在rdd中map之後,呼叫的是parseRating 這個方法,

而這個方法只是將字串切分,然後封裝到case class中,感覺沒什麼問題,

和同事交流以及檢視網上的一些資料之後發現,在executor端執行的程式如果需要使用外部物件,那麼外部物件必須序列化,在這裡, parseRating

方法是在類 ScoreConvert1.scala中的方法,而 fillWithScore 也是ScoreConvert1.scala中的成員方法,

在executor呼叫parseRating方法時候必須保證方法所在的類中的成員變數,成員方法,外部變數都實現序列化,也就是說這裡雖然ScoreConvert1這個類實現了序列化,但是其中的外部變數sparkContext沒有實現序列化,所以會報錯 !!!

既然找到了錯誤原因,那就改進吧,

 方法一:  將parseRating方法中所做的事情,放到map中,這樣的話,就不需要ScoreConvert1.scala實現序列化


方法二:  使用 parseRating這個函式,這時候就需要將 ScoreConvert1.scala 中的成員變數,方法,外部變數全部都實現序列化,但是這時候可以把讀取資料的那部分操作放到方法的外部,然後將rdd作為引數傳到方法中,這時候,ScoreConvert1.scala 必須實現序列化,變化如下標記



另外,這時候,sc這個引數傳到 ScoreConvert1這個類中已經沒意義了,因為在這個類中,沒有使用sc的

改進如下:

import java.io.FileInputStream
import java.util.Properties

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SparkSession}

/**
  * Created by xiaoxiao_zhang on 2017/5/9.
  * 評分轉換:
  * 將使用者行為轉化為對應的評分(瀏覽,收藏,加購物車,購買)
  *
  */
/**
  * Created by xiaoxiao_zhang on 2017/5/11.
  */
class ScoreConvert1 extends Serializable {

  def covert2Score(sourceRdd: RDD[String]) = {
    val scoreRdd = sourceRdd.map(t => {
      val splits = t.split(",")
      if (splits.length >= 3) {
        val actionScore = splits(2) match {
          case "1" => 1
case "2" => 3
case "3" => 6
case "4" => 8
case _ => 0.0
}
        Array(splits(0), splits(1), actionScore).mkString(",")
      } else {
        null
}
    })
    scoreRdd
  }

  def parseRating(str: String): Rating1 = {
    val fields = str.split(",")
    Rating1(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
  }

  def fillWithScore(scores: RDD[String], spark: SparkSession) {
    import spark.implicits._
    val ratings = scores
      .map(parseRating)
      .toDF()

    val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
    val als = new ALS()
      .setMaxIter(5)
      .setRegParam(0.01)
      .setUserCol("userId")
      .setItemCol("itemId")
      .setRatingCol("rating")
    val model: ALSModel = als.fit(training)
    model.itemFactors.show()

    println("-----1111-------")
    val userFactors = model.userFactors
    val itemFactors = model.itemFactors

    userFactors.show(truncate = false)
    println("------2222-------")
    model.itemFactors.show(truncate = false)

    val joinedDF = userFactors.join(itemFactors,Seq("features"),"inner")
    println("------3333-------")
    joinedDF.show(10)
  }
}


object ScoreConvert1 {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ScoreConvert").setMaster("local[5]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val props = new Properties()
    props.load(new FileInputStream(Constant.profileName))

    val spark = SparkSession.builder().getOrCreate()

    val sourceRdd: RDD[String] = sc.textFile("data/tianchi_mobile_recommend_train_user.csv")
    val convert: ScoreConvert1 = new ScoreConvert1()
    val scores: RDD[String] = convert.covert2Score(sourceRdd)
    println("scores-----" + scores.take(3).toList)
    convert.fillWithScore(scores, spark)

  }
}

case class Rating1(userId: Int, itemId: Int, rating: Float)

總結: 只要在.map,.filter這樣的rdd內部想使用外部變數的話必須實現序列化,這裡外部變數可能是函式所在類的成員變數,類的成員函式,類的建構函式中的其他外部變數

不然的話要麼就將函式所做的事情放到map這樣的內部做,這樣就不需要物件實現序列化至此,困擾2天的序列化問題已經解決

記於 2017-05-11   推薦系統專案