1. 程式人生 > >機器學習 - Mllib大法好

機器學習 - Mllib大法好

基本模型

標籤的注入

注入單個標籤

 //注入標籤
  def labelDemo1() = {
    val vd = Vectors.dense(2, 0, 6)
    val pos = LabeledPoint(1, vd)
    println(pos.features) //內容資料---[2.0,0.0.6.0]
    println(pos.label) //既定標籤--1.0
  }

注入批量標籤

2.3 3.2 1
1.6 2.3 2
5.1 2.1 1
6.2 7.2 1
3.2 1.2 2
9.2 2.1 1

 def labelDemo2(sc: SparkContext) = {
    val data = sc.textFile("D:\\ml\\labeled.txt", 2)
    val datap = data.map { x =>
      val parts = x.split(" ")
      LabeledPoint(parts(2).toDouble, Vectors.dense(parts(0).toDouble, parts(1).toDouble))
    }
    datap.foreach { println } //檢視標籤資料-->(1.0,[2.3,3.2])
    datap.foreach { x => println(x.features) } //檢視特徵資料(自變數)-->[2.3,3.2]
    datap.foreach { x => println(x.label) } //檢視標籤資訊(因變數)-->1.0
  }

向量索引的使用

1 2:1 3:2 6:3 7:4
2 1:1 2:2 3:3
1 1:1 2:3 3:3
1 1:3 2:1 3:3

 /**
   * SVM支援向量機-->監督學習模型
   * 1 2:1 3:2 6:3 7:4
   * 2 1:1 2:2 3:3
   */
  def labelDemo03(sc: SparkContext) = {
    //MLUtils.loadLibSVMFile資料集標記的index是從1開始
    //loadLibSVMFile方法將資料分解成一個稀疏向量進行下一步的操作
    val data = MLUtils.loadLibSVMFile(sc, "D:\\ml\\data.txt")
    //(1.0,(7,[1,2,5,6],[1.0,2.0,3.0,4.0]))->(標簽,(整體總向量,[向量下標],[對應的內容]))
    //向量索引順序:可以跳號,但必須從小到大排列
    data.foreach { println }
  }

矩陣的使用

本地矩陣Matrices的使用

def localDemo01() = {
    //Matrices.dense方法是矩陣重組的呼叫方法
    //第一個引數是新矩陣行數,第二個引數是新矩陣的列數,第三個引數為傳入的資料值。
    val mx = Matrices.dense(2, 3, Array(1, 2, 3, 4, 5, 6))
    println(mx)
  }

行矩陣rowMatrix的使用

1 2 3
4 5 6
7 8 9
10 11 12
1 3 4

  def matrixDemo01(sc: SparkContext) = {
    val data = sc.textFile("D:\\ml\\rowMatrix.txt", 2)
    //轉成Vector格式的資料
    val datap = data.map { _.split(" ").map(_.toDouble) }
    		.map { arr => Vectors.dense(arr) }
    datap.foreach { println }
    val rm = new RowMatrix(datap)
    println(rm.numCols())
    println(rm.numRows())
  }

帶行索引的行矩陣

def matrixDemo02(sc: SparkContext) = {
    val data = sc.textFile("D:\\ml\\rowMatrix.txt", 3)
    var index = 0
    //按分割槽佈置索引,分多少個分割槽就統計多少次,如2個分割槽就有2次index的計算,
    val datap = data.map { _.split(" ").map { _.toDouble } }
    .map { arr => Vectors.dense(arr) }.map { vt =>
      index += 1
      new IndexedRow(index, vt)
    }
    val rm = new IndexedRowMatrix(datap)
    rm.rows.foreach { println } //IndexedRow(3,[1.0,2.0,3.0])
    rm.rows.filter { x => x.index == 1 }.foreach { println }
  }

基本統計的運用

基本統計

1
2
3
4
5

def statisticsDemo01(sc: SparkContext) = {
    val data = sc.textFile("D:\\ml\\testSummary.txt", 2)
      .map { _.split("").map { _.toDouble } }.map { arr => Vectors.dense(arr) }
    val summ = Statistics.colStats(data)
    //向量-->
    //一元向量集合--List([1.0],[2.0],[3.0],[4.0],[5.0])
    data.foreach { println }
    println(summ.mean) //均值--[3.0]
    println(summ.variance) //方差--[2.5]
    println(summ.max) //最大值--[5.0]
    println(summ.min) //最小值--[1.0]
    println(summ.count) //計數--5
  }

皮爾遜相關係數的使用

皮爾遜相關係數的變化範圍為-1到1
係數的值為1意味著X和Y可以很好的由直線方程來描述,所有的資料點都很好的落在一條直線上

testCorrX.txt

1 2 3 4 5 17
2 4 6 8 10

testCorrY.txt

2 4 7 9 10
11 33 66 88 99 22

 def statisticsPearson(sc: SparkContext) = {
    //注意切分用flatMap,且不能分割槽
    val xdata = sc.textFile("D:\\ml\\testCorrX.txt")
     .flatMap { _.split(" ").map { _.toDouble } }
    val ydata = sc.textFile("D:\\ml\\testCorrY.txt")
     .flatMap { _.split(" ").map { _.toDouble } }
    //計算兩組的相關係數->先zip兩個資料
    //可以將換行符識別成空格切分,資料量必須一樣,排列方式可以不一樣
    val correlation = Statistics.corr(xdata, ydata)
    println(correlation)//--0.11877692214047662
  }

最小二乘法

y=x1a1+x2a2+…+xnan+b

最小二乘法(又稱最小平方法)是一種數學優化技術。

它通過最小化誤差的平方和尋找資料的最佳函式匹配。利用最小二乘法可以簡便地求得未知的資料,並使得這些求得的資料與實際資料之間誤差的平方和為最小。最小二乘法還可用於曲線擬合。

其他一些優化問題也可通過最小化能量或最大化熵用最小二乘法來表達。

預測商品需求量

y=β1X1+β2X2+β0

某種商品的需求量(y,噸)、價格(x1,元/千克)和消費者收入(x2,元)觀測值如下所示。

100|5 1000
75|7 600
80|6 1200
70|6 500

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression

object range {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("distance")
    val sc = new SparkContext(conf)
    val sqc = new SQLContext(sc)
    val data = sc.textFile("D:\\ml\\lritem.txt")
    val datap = data.map { x =>
      val parts = x.split("\\|")
      val features = parts(1).split(" ")
      (parts(0).toDouble, features(0).toDouble, features(1).toDouble)
    }
    //轉成DF
    val df = sqc.createDataFrame(datap)
    //定義各列欄位
    val dfData = df.toDF("Y", "X1", "X2")
    //定義features欄位名,宣告除了Y之外都是特徵列
    val feaColum = Array("X1", "X2")
    //向量格式轉換器,指定特徵向量是哪幾列,以及特徵向量對應的別稱,一般用"features"
    val assembler = 
      new VectorAssembler().setInputCols(feaColum).setOutputCol("features")
    //將DataFrame轉成VectorAssemmbler格式
    val vecDF = assembler.transform(dfData)

    //建模,設定自變數和因變數
    //--setFitIntercept(true) 表示是否需要計算截距項
    //--setLabelCol("Y") 指定因變數起名
    //--setFeaturesCol()為特徵列起名
    //--fit(資料)是訓練生成模型
    val model = new LinearRegression().setFeaturesCol("features")
      .setLabelCol("Y").setFitIntercept(true).fit(vecDF)
    //獲取方程係數
    //[-6.497089660950155,0.01631790530613281]
    println("方程係數:" + model.coefficients)
    //獲取截距項係數
    //106.36879716406025
    println("截距項係數:" + model.intercept)
    //獲取模型的R方值(擬合優度判定係數)
    //0.9107147177285084
    //越接近1,擬合越好,0.55以上的都可以接受
    println("R方值係數:" + model.summary.r2)

    //預測資料,測試的資料格式必須是VectorAssemmbler
    val predict = model.transform(vecDF)
    val res = predict.select("prediction")
    val testdata = sc.textFile("D:\\ml\\testItem.txt", 3)
    val testdatap = testdata.map { line =>
      val info = line.split(" ")
      (info(0).toDouble, info(1).toDouble)
    }
    val testDF = sqc.createDataFrame(testdatap).toDF("X1", "X2")
    val testV = assembler.transform(testDF)
    val testres = model.transform(testV)
    testres.show()

    //自定義測試資料
    val datatest = assembler.transform(sqc.createDataFrame(sc.makeRDD(List((8.0, 35))))
    							.toDF("X1", "X2"))
    //--將測試集做預測
    val testPredict = model.transform(datatest)

    testPredict.show()

  }

}

預測謀殺率

615, 3624, 2.1, 69.05, 0, 41.3, 20, 50708
65, 6315, 1.5, 69.31, 0, 66.7, 152, 566432
212, 4530, 1.8, 70.55, 0, 58.1, 15, 113417
110, 3378, 1.9, 70.66, 0, 39.9, 65, 51945
1198, 5114, 1.1, 71.71,0, 62.6, 20, 156361
541, 4884, 0.7, 72.06, 0, 63.9, 166, 103766

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression

object kill {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("kill")
    val sc= new SparkContext(conf)
    val sqc= new SQLContext(sc)
    val data = sc.textFile("D:\\killsample.txt")
    
    val datap=data.map { line => 
    val infos=line.split(",")
    (infos(0).toDouble,infos(1).toDouble,infos(2).toDouble,infos(3).toDouble,
        infos(4).toDouble,infos(5).toDouble,infos(6).toDouble,infos(7).toDouble)   
    }
    val df=sqc.createDataFrame(datap).toDF(
        "Population", "Income", "Illiteracy", "LifeExp", "Murder", "HSGrad", "Frost", "Area"
    )
    val colArray=Array(
        "Population", "Income", "Illiteracy", "LifeExp", "HSGrad", "Frost", "Area")
    val ass=new VectorAssembler().setInputCols(colArray).setOutputCol("features")
    val vecDF=ass.transform(df)
    
    //建立模型
    val lr1=new LinearRegression()
    val lr2=lr1.setFitIntercept(true).setLabelCol("Murder").setFeaturesCol("features")
    //正則化,防止過擬合
    val lr3=lr2.setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
    val lr=lr3
    val model=lr.fit(vecDF)
    println("方程係數:"+model.coefficients)
    println("截距係數:"+model.intercept)
    println("R方差係數:"+model.summary.r2)
    //println(s"Coefficients: ${model.coefficients} Intercept: ${model.intercept}")
    
    val dataT = sc.textFile("D:\\ml\\lrmurder-test.txt")
    val dataTp=dataT.map { line => 
    val infos=line.split(",")
    (infos(0).toDouble,infos(1).toDouble,infos(2).toDouble,infos(3).toDouble,
        infos(4).toDouble,infos(5).toDouble,infos(6).toDouble,infos(7).toDouble)   
    }
    val testDF=sqc.createDataFrame(dataTp).toDF("Population", "Income", "Illiteracy", "LifeExp","Murder", "HSGrad", "Frost", "Area")
    val testV=ass.transform(testDF)
    val testres=model.transform(testV)
   testres.createTempView("res")
   sqc.sql("select r1.*,r2.Murder act,abs(r1.Murder-r1.prediction) intercept from res r1,res r2 where r1.features=r2.features order by r1.features").show()
   
     
  }
}

梯度下降法

在微積分裡面,對多元函式的引數求∂偏導數,把求得的各個引數的偏導數以向量的形式寫出來,就是梯度

他的意義從幾何意義上講,就是函式變化最快的地方。具體來說,對於函式f(x,y),在點(x0,y0),沿著梯度向量的方向就是(∂f/∂x0, ∂f/∂y0)T的方向是f(x,y)增加最快的地方。或者說,沿著梯度向量的方向,更加容易找到函式的最大值。反過來說,沿著梯度向量相反的方向,也就是 -(∂f/∂x0, ∂f/∂y0)T的方向,梯度減少最快,也就是更加容易找到函式的最小值。