Spark MLlib 入門學習筆記
關於邏輯迴歸的演算法原理 Spark官方文件裡有說明,另外網上也有中文翻譯文件可參考。本筆記是學習MLlib的輯迴歸API使用時一道練習題記錄,通過這道練習,可以掌握基本使用。MLLib提供了兩種演算法實現,分別是SGD梯度下降法和LBFGS。
1. 資料檔案
交通事故的統計檔案,四列,accident(去年是否出過事故,1表示出過事故,0表示沒有),age(年齡 數值型),vision(視力狀況,分型別,1表示好,0表示有問題),drive(駕車教育,分型別,1表示參加過駕車教育,0表示沒有)。第1列是因變數,其它3列是特徵。這是一個用空格分隔的文字檔案,要使用MLLib演算法庫,首先要讀檔案並轉成LabeledPoint資料型別的RDD。
2. SGD演算法1 17 1 1 1 44 0 0 1 48 1 0 1 55 0 0 1 75 1 1 0 35 0 1 0 42 1 1 0 57 0 0 0 28 0 1 0 20 0 1 0 38 1 0 0 45 0 1 0 47 1 1 0 52 0 0 0 55 0 1 1 68 1 0 1 18 1 0 1 68 0 0 1 48 1 1 1 17 0 0 1 70 1 1 1 72 1 0 1 35 0 1 1 19 1 0 1 62 1 0 0 39 1 1 0 40 1 1 0 55 0 0 0 68 0 1 0 25 1 0 0 17 0 0 0 45 0 1 0 44 0 1 0 67 0 0 0 55 0 1 1 61 1 0 1 19 1 0 1 69 0 0 1 23 1 1 1 19 0 0 1 72 1 1 1 74 1 0 1 31 0 1 1 16 1 0 1 61 1 0
package classify /* accident.txt accident(去年是否出過事故,1表示出過事故,0表示沒有) age(年齡 數值型) vision(視力狀況,分型別,1表示好,0表示有問題) drive(駕車教育,分型別,1表示參加過駕車教育,0表示沒有) */ import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.classification.LogisticRegressionWithSGD import org.apache.spark.{SparkConf, SparkContext} object LogisticSGD { def parseLine(line: String): LabeledPoint = { val parts = line.split(" ") val vd: Vector = Vectors.dense(parts(1).toDouble, parts(2).toDouble, parts(3).toDouble) return LabeledPoint(parts(0).toDouble, vd ) } def main(args: Array[String]){ val conf = new SparkConf().setMaster(args(0)).setAppName("LogisticSGD") val sc = new SparkContext(conf) val data = sc.textFile(args(1)).map(parseLine(_)) val splits = data.randomSplit(Array(0.6, 0.4), seed=11L) val trainData = splits(0) val testData = splits(1) val model = LogisticRegressionWithSGD.train(trainData, 50) println(model.weights.size) println(model.weights) println(model.weights.toArray.filter(_ != 0).size) val predictionAndLabel = testData.map(p => (model.predict(p.features), p.label)) predictionAndLabel.foreach(println) } }
parseLine函式將文字檔案的每一行轉成一個LabeledPoint資料型別,randomSplit用例把資料集分成訓練和測試兩部分。val model = LogisticRegressionWithSGD.train(trainData, 50) 執行訓練並得到模型,這裡的50為迭代次數。val predictionAndLabel = testData.map(p => (model.predict(p.features), p.label))中的model.predict執行預測,testData.map測試資料集的特徵值傳遞給model去預測,並將預測值與原有的label合併形成一個新的map。
3. LBFGS演算法
package classify
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
object LogisticLBFGS {
def parseLine(line: String): LabeledPoint = {
val parts = line.split(" ")
val vd: Vector = Vectors.dense(parts(1).toDouble, parts(2).toDouble, parts(3).toDouble)
return LabeledPoint(parts(0).toDouble, vd )
}
def main(args: Array[String]){
val conf = new SparkConf().setMaster(args(0)).setAppName("LogisticLBFGS")
val sc = new SparkContext(conf)
val data = sc.textFile(args(1)).map(parseLine(_))
val splits = data.randomSplit(Array(0.6, 0.4), seed=11L)
val trainData = splits(0)
val testData = splits(1)
val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainData)
println(model.weights.size)
println(model.weights)
println(model.weights.toArray.filter(_ != 0).size)
val prediction = testData.map(p => (model.predict(p.features), p.label))
//println(prediction)
prediction.foreach(println)
}
}
val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainData)中的setNumClasses(2)設定分類數。
對於這個列子,LBFGS的效果比SGD的效果好。