1. 程式人生 > >Spark隨機森林演算法實踐

Spark隨機森林演算法實踐

    1. 例子1

object RunRF {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("rf")

    val sc = new SparkContext(sparkConf)

    //讀取資料

    val rawData = sc.textFile("hdfs://192.168.1.64:8020/test/mllib/v3.csv"

)

    val data = rawData.map{ line =>

      val values = line.split(",").map(_.toDouble)

      //init返回除了最後一個元素的所有元素,作為特徵向量

      //Vectors.dense向量化,dense密集型

      val feature = Vectors.dense(values.init)

      val label = values.last

      LabeledPoint(label, feature)

    }

    //訓練集、交叉驗證集和測試集,各佔80%10%10%

    //10%的交叉驗證資料集的作用是確定在訓練資料集上訓練出來的模型的最好引數

    //測試資料集的作用是評估CV資料集的最好引數

    val

Array(trainData, cvData, testData) = data.randomSplit(Array(0.8, 0.1, 0.1))

    trainData.cache()

    cvData.cache()

    testData.cache()

 

    //構建隨機森林

    val model = RandomForest.trainClassifier(trainData, 2, Map[Int, Int](), 20, "auto", "gini", 4, 32)

    val metrics = getMetrics(model, cvData)

    println("-----------------------------------------confusionMatrix-----------------------------------------------------")

    //混淆矩陣和模型精確率

    println(metrics.confusionMatrix)

    println("---------------------------------------------precision-------------------------------------------------")

    println(metrics.precision)

 

    println("-----------------------------------------(precision,recall)---------------------------------------------------")

    //每個類別對應的精確率與召回率

    (0 until 2).map(target => (metrics.precision(target), metrics.recall(target))).foreach(println)

    //儲存模型

    model.save(sc,"hdfs://192.168.1.64:8020/tmp/RFModel")

 

  }

 

/**

    * @param model 隨機森林模型

    * @param data  用於交叉驗證的資料集

    * */

  def getMetrics(model: RandomForestModel, data: RDD[LabeledPoint]): MulticlassMetrics = {

    //將交叉驗證資料集的每個樣本的特徵向量交給模型預測,並和原本正確的目標特徵組成一個tuple

    val predictionsAndLables = data.map { d =>

      (model.predict(d.features), d.label)

    }

    //將結果交給MulticlassMetrics,其可以以不同的方式計算分配器預測的質量

    new MulticlassMetrics(predictionsAndLables)

  }

 

/**

    * 在訓練資料集上得到最好的引數組合

    * @param trainData 訓練資料集

    * @param cvData 交叉驗證資料集

    * */

  def getBestParam(trainData: RDD[LabeledPoint], cvData: RDD[LabeledPoint]): Unit = {

    val evaluations = for (impurity <- Array("gini", "entropy");

                           depth <- Array(1, 20);

                           bins <- Array(10, 300)) yield {

      val model = RandomForest.trainClassifier(trainData, 2, Map[Int, Int](), 20, "auto", impurity, depth, bins)

// 2classes

// 20: numTrees

// auto:subSampleStratry

      val metrics = getMetrics(model, cvData)

      ((impurity, depth, bins), metrics.precision)

    }

    evaluations.sortBy(_._2).reverse.foreach(println)

  }

 

/**

    * 模擬對新資料進行預測1

    */

  val rawData = sc.textFile("hdfs://192.168.1.64:8020/test/mllib/v3.csv")

 

  val pdata = rawData.map{ line =>

    val values = line.split(",").map(_.toDouble)

    //轉化為向量並去掉標籤(init去掉最後一個元素,即去掉標籤)

    val feature = Vectors.dense(values.init)

    feature

  }

  //讀取模型

  val rfModel = RandomForestModel.load(sc,"hdfs://192.168.1.64:8020/tmp/RFModel")

  //進行預測

  val preLabel = rfModel.predict(pdata)

  preLabel.take(10)

  /**

    * 模擬對新資料進行預測2

    *

    */

  val dataAndPreLable = rawData.map{ line =>

    //轉化為向量並去掉標籤(init去掉最後一個元素,即去掉標籤)

    val vecData = Vectors.dense(line.split(",").map(_.toDouble).init)

    val preLabel = rfModel.predict(vecData)

    line + "," + preLabel

  }//.saveAsTextFile("....")

dataAndPreLable.take(10)

}

 

    1. 例子2:處理Hive資料

val hc = new HiveContext(sc)

    import hc.implicits._

    // 呼叫HiveContext

 

    // 取樣本,樣本的第一列為label0或者1),其他列可能是姓名,手機號,以及真正要參與訓練的特徵columns

    val data = hc.sql(s"""select  *  from database1.traindata_userprofile""".stripMargin)

    //提取schema,也就是表的column namedrop2)刪掉1,2列,只保留特徵列

 

    val schema = data.schema.map(f=>s"${f.name}").drop(2)

 

    //MLVectorAssembler是一個transformer,要求資料型別不能是string,將多列資料轉化為單列的向量列,比如把ageincome等等欄位列合併成一個 userFea 向量列,方便後續訓練

    val assembler = new VectorAssembler().setInputCols(schema.toArray).setOutputCol("userFea")

    val userProfile = assembler.transform(data.na.fill(-1e9)).select("label","userFea")

    val data_train = userProfile.na.fill(-1e9)

    // 取訓練樣本

    val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(userProfile)

    val featureIndexer = new VectorIndexer().setInputCol("userFea").setOutputCol("indexedFeatures").setMaxCategories(4).fit(userProfile)

 

    // Split the data into training and test sets (30% held out for testing).

    val Array(trainingData, testData) = userProfile.randomSplit(Array(0.7, 0.3))

    // Train a RandomForest model.

    val rf = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")

    rf.setMaxBins(32).setMaxDepth(6).setNumTrees(90).setMinInstancesPerNode(4).setImpurity("gini")

    // Convert indexed labels back to original labels.

    val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)

 

    val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))

 

    // Train model. This also runs the indexers.

    val model = pipeline.fit(trainingData)

    println("training finished!!!!")

    // Make predictions.

    val predictions = model.transform(testData)

 

    // Select example rows to display.

    predictions.select("predictedLabel", "indexedLabel", "indexedFeatures").show(5)

 

    val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")

    val accuracy = evaluator.evaluate(predictions)

    println("Test Error = " + (1.0 - accuracy))