1. 程式人生 > >spark MLlib 之構建機器學習系統

spark MLlib 之構建機器學習系統

構建 spark 機器學習系統

spark 機器學習系統架構

spark系統架構

spark 和 hadoop 叢集的安裝

請參照下面的連結

spark-shell

  • 1.x Spark-Shell 自動建立一個 SparkContext 物件 sc
  • 2.x Spark-Shell 引入了 SparkSession 物件(spark),執行 Spark-Shell 會自動建立一個 SparkSession 物件,在輸入 spark、SparkContext、SQLContext 都已經封裝在 SparkSession 物件當中,它為使用者提供了統一的的切入點,同時提供了各種DataFrame 和 DataSet 的API。

載入資料

  • 原始資料。每個欄位以 “,” 分割,源資料本來是以 “|” 分割的,但是實際處理中發現欄位不能是Int(scala StructField 程式碼段,欄位型別為IntegerType) 型別,否則程式會報錯,這個有待進一步測試
    列出3行樣例資料
useid age gender occupation zipcode
1 24 M technician 85711
2 53 F other 94043
3 23 M writer 32067
  • 啟動 pyspark
// python  code
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.appName("python spark SQL basic example").getOrCreate()

sc = spark.sparkContext

userrdd = sc.textFile("hdfs://master:9000/u01/bigdata/user_test.txt").map(lambda line
: line.split(",")) df = userrdd.map(lambda fields: Row(userid = fields[0],age = int(fields[1]),gender = fields[2],occupation = fields[3],zipcode = fields[4])) schemauser = spark.createDataFrame(df) schemauser.createOrReplaceTempView("user") schemauser.describe("userid","age","gender","occupation","zipcode").show() // spark-submit /data/spark/bin/spark-submit --master yarn \ --deploy-mode cluster \ --num-executors 2 \ --executor-memory '1024m' \ --executor-cores 1 \ /data/project/spark/python/load.py // 注意如果有錯誤,將錯誤解決了,還是報同樣的錯誤,那基本上可以肯定是快取在作怪,需要刪除當前目錄下 "metastore_db" 這個目錄。python 程式碼還可以直接提交 spark-submit load.py 只是這種方式使用的 spark standalone 叢集管理模式 // scala code package spark.mllib import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import scala.collection.mutable import org.apache.spark.{SparkConf,SparkContext} object loadData { def main(args: Array[String]) { val userSchema: StructType = StructType(mutable.ArraySeq( StructField("userid",IntegerType,nullable = false), StructField("age",IntegerType,nullable = false), StructField("gender",StringType,nullable = false), StructField("occupation",StringType,nullable = false), StructField("zipcode",StringType,nullable = false) )) val conf = new SparkConf().setAppName("load data") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val userData = sc.textFile("hdfs://master:9000/u01/bigdata/user_test.txt").map { lines => val line = lines.split(",") Row(line(0).toInt,line(1)toInt,line(2),line(3),line(4)) } val userTable = sqlContext.createDataFrame(userData,userSchema) userTable.registerTempTable("user") userTable.describe("userid","age","gender","occupation","zipcode").show sqlContext.sql("SELECT max(userid) as useridMax FROM user").show() } } // spark-submit /data/spark/bin/spark-submit --master yarn \ --deploy-mode cluster \ --num-executors 2 \ --executor-memory '1024m' \ --executor-cores 1 \ --class spark.mllib.loadData ./target/scala-2.11/mergefile_2.11-2.2.1.jar

關於 getOrCreate() 這個函式的使用方法詳情見下面的連結

探索資料

資料統計資訊

userTable.describe(“userid”,”age”,”gender”,”occupation”,”zipcode”).show
這一行程式碼會輸出一個表格

summary userid age gender occupation zipcode
count 943 943 943 943 943 943
mean 472.0 34.05196182396607 null null 50868.78810810811
stddev 272.3649512449549 12.19273973305903 null null 30891.373254138158
min 1 7 F administrator 00000
max 99 73 M writer Y1A6B

資料質量分析

  • count 統計資料的總量,有多少條記錄
    • 總數
    • 非空記錄
    • 空值 = 總數 - 非空記錄
  • mean 計算平均值
  • std 標準差
  • max 最大值
  • min 最小值
  • 百分位數:百分位數則是對應於百分位的實際數值。例如:將一個組數按小到大的順序排序,25%=3.2,表示這組數中有25%的數是小於或等於3.2,75%的數是大於或等於3.2的。至於計算方式每個演算法的計算方式不一樣,pandas的計算方式沒有找到。
    • 25%
    • 50%
    • 75%

百分位數計算方法

# linux伺服器匯入python matplotlib.pyplot報錯


import matplotlib as mpl

mpl.use('Agg')

# 再執行

import matplotlib.pyplot as plt

# 需要儲存圖片到指定的目錄

plt.savefig("/home/yourname/test.jpg") 

# 即使是能夠保持圖片,但是圖片傳到Windows平臺開啟是空白的,畫圖看來最好還是要在Windows平臺上執行

# 完整的程式碼,在Windows 平臺上執行

# coding:utf-8

import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv("C:\\Users\\ljb\\Desktop\\catering_sale.csv")
print(df.count())
print(df.describe())
plt.figure()
bp = df.boxplot(return_type="dict")
# fliers 為異常資料標籤
x = bp[ 'fliers' ][0].get_xdata()
y = bp[ 'fliers'][0].get_ydata()
y.sort()
print("x: ",x)
print("y: ",y)

# 用 annotate 添加註釋
for i in range(len(x)):
    plt.annotate(y[i],xy=(x[i],y[i]),xytext=(x[i] + 0.1 - 0.8/(y[i] - y[i-1]),y[i]))

plt.show()

/*
sale_date    200
sale_amt     198  從這裡可以看出有2個空白的值
dtype: int64
          sale_amt
count   198.000000
mean   2765.545152
std     709.557639
min      22.000000
25%    2452.725000
50%    2655.850000
75%    3023.500000
max    9106.440000
x:  [ 1.  1.  1.  1.  1.  1.]
y:  [   22.     865.    1060.    4065.2   6607.4   9106.44]

*/

image

從上圖可以看出有 6 個可能的異常值都在圖上表現出來,但是具體是否異常,需要和銷售人員確認

資料特徵分析

特徵:用於模型訓練的變數。可以看做資料的屬性。
1. 如果這些資料是記錄人,那特徵就是年齡,性別,籍貫,收入等。
2. 如果這些資料記錄的是某個商品,那特徵就是商品類別,價格,產地,生成日期,銷售數量等

  • 特徵分佈分析與相關性分析
    • 有助於發現相關資料的分佈特徵、分佈型別、分佈是否鄧超等,可以使用視覺化方法,這樣便於直觀發現特徵的異常值
  • 對比分析
  • 統計量分析

特徵資料分析,例子

# coding:utf-8

from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
import sys

spark = SparkSession.builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

sc = spark.sparkContext

userrdd = sc.textFile("C:\\Users\\ljb\\Desktop\\user_test.txt").map(lambda line:line.split(","))

df = userrdd.map(lambda fields: Row(userid = fields[0],age = int(fields[1]),gender = fields[2],
                                    occupation = fields[3],zipcode = fields[4]))
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")
age = spark.sql("SELECT * FROM user")

ages = age.rdd.map(lambda p:p.age).collect()


# 分析"年齡",這個特徵,通過直方圖的形式顯示出來
plt.hist(ages,bins=10,color="lightblue",normed=True)
plt.show()

image

image

#hist 對這個函式做簡要的說明
# 先看下原始碼:
def hist(x, bins=None, range=None, normed=False, weights=None, cumulative=False,
         bottom=None, histtype='bar', align='mid', orientation='vertical',
         rwidth=None, log=False, color=None, label=None, stacked=False,
         hold=None, data=None, **kwargs):
    ax = gca()
    # Deprecated: allow callers to override the hold state
    # by passing hold=True|False
    washold = ax._hold

    if hold is not None:
        ax._hold = hold
        from matplotlib.cbook import mplDeprecation
        warnings.warn("The 'hold' keyword argument is deprecated since 2.0.",
                      mplDeprecation)
    try:
        ret = ax.hist(x, bins=bins, range=range, normed=normed,
                      weights=weights, cumulative=cumulative, bottom=bottom,
                      histtype=histtype, align=align, orientation=orientation,
                      rwidth=rwidth, log=log, color=color, label=label,
                      stacked=stacked, data=data, **kwargs)
    finally:
        ax._hold = washold

    return ret

/*
返回的值是一個tuple(n.bins,patches) or  ([n0, n1, ...], bins, [patches0, patches1,...]) (輸入的資料是多重資料)
引數解釋:
x: 一個數組,主要對這個陣列中的資料畫圖,可以是多維陣列。
bins:總共有幾個條狀,預設是10
color:表示直方圖的顏色
*/

我們還可以進一步分析使用者職業分佈特徵

# coding:utf-8

from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
import sys
import numpy as np

spark = SparkSession.builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

sc = spark.sparkContext

userrdd = sc.textFile("C:\\Users\\ljb\\Desktop\\user_test.txt").map(lambda line:line.split(","))

df = userrdd.map(lambda fields: Row(userid = fields[0],age = int(fields[1]),gender = fields[2],
                                    occupation = fields[3],zipcode = fields[4]))
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")

# 查詢 occupation(職業),並按其分組,然後統計每個職業出現的次數,最後以職業出現的次數進行排序升序
count_occp = spark.sql("SELECT occupation,count(occupation) as cnt FROM user GROUP BY occupation ORDER BY  cnt")
count_occp.show(21) # 顯示前21行資料

# 獲取職業名稱及對應出現的次數,以便畫出各職業總數圖
# 把執行的結果轉換成 RDD
x_axis = count_occp.rdd.map(lambda p:p.occupation).collect()
y_axis = count_occp.rdd.map(lambda p:p.cnt).collect()

pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes()
# 設定 x 的刻度
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)
plt.bar(pos,y_axis,width,color="orange")
plt.xticks(rotation=30)
fig = plt.gcf()
fig.set_size_inches(16,10)
plt.show()

對比分析的例子

# coding:utf-8

import pandas as pd
import matplotlib.pyplot as plt
df = pd.read_csv("C:\\Users\\ljb\\Desktop\\catering_sale.csv",header=0,index_col='sale_date',parse_dates=True)
df1 = df.fillna(0)
df_ym = df1.resample("M",how="sum")
df2 = df_ym.to_period("M")

df2.plot(kind="bar",rot=30)
plt.show()

資料視覺化

  • 通過資料視覺化可以幫助我們發現數據的異常值、特徵的分佈情況等,為資料預處理提供重要的支援
  • spark 對資料的視覺化功能還很弱,這裡需要使用python 或者 R
  • Python 視覺化可以使用 matplotlib 和 plot
  • 下面使用 sin(x),cos(x) 這兩個函式分佈介紹 matplotlib 和 plot 這兩種python 資料視覺化的方法
# matplotlib 視覺化方法
# coding:utf-8

import numpy as np
import matplotlib
import matplotlib.pyplot as plt

# 畫圖中能夠顯示中文
plt.rcParams[ 'font.sans-serif' ] = [ 'SimHei' ]
# 防止座標軸上 "-" 號變成方塊
plt.rcParams[ 'axes.unicode_minus' ] = False

# np.linspace(args1,args2,args3) 生成等差數列 args1:起始值,從什麼數開始,args2:結束值,args3:生成多個數
x = np.linspace(0,10,100)
y = np.sin(x)
y1 = np.cos(x)

# 畫布的長度是10,寬度是6
plt.figure(figsize=(10,6))
# label 標籤在圖上顯示兩個 $$ 括起來的部分:sin(x),線的顏色是紅色,線的寬度是2
plt.plot(x,y,label="$sin(x)$",color="red",linewidth=2)
# "b--" 表示藍色虛線
plt.plot(x,y1,"b--",label="$cos(x^2)$")
# 設定 X 軸標籤
plt.xlabel(u"X 值")
# 設定 Y 軸標籤
plt.ylabel(u"Y 值")
# 設定影象的標題
plt.title(u"三角函式影象")
# y 軸最大值:1.2和最小值:-1.2,這個沒有多大意義,三角函式取值範圍 -1 <= y <= 1
plt.ylim(-1.2,1.2)
# 顯示圖例,就是左上角會有紅色線條表示 sin(x),藍色虛線表示 cos(x^2)
plt.legend()
# 將圖片儲存至當前目錄下
plt.savefig("fig01.jpg")
# 顯示圖片
plt.show()

# plot 視覺化

# coding:utf-8

from pandas import DataFrame
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
x = np.linspace(0,10,100)
df = DataFrame({'sin(x)':np.sin(x),'cos(x)':np.cos(x)},index=x)

df.plot()
plt.show()

資料預處理

資料清理

  • 填補缺失資料
# coding:utf-8

import  pandas as pd
import lxml


df = pd.read_csv("C:\\Users\\ljb\\Desktop\\catering_sale.csv",header=0)

# 顯示缺失的資料
print(df[df.isnull().values == True])

# 使用 0 填補空值
print(df.fillna(0))
# 使用該列的平均值填補空值
print(df["sale_amt"].fillna(df["sale_amt"].count()))

# 使用該列的前一行值填補空值
print(df.fillna(method="pad"))
  • 光滑噪聲資料。有兩類處理方法
    • 分箱
    • 聚類
  • 處理奇異資料
# 進入 Pyspark,讀取資料
df = spark.read.csv("/sparkMLlib/catering_sale.csv",header=True)
# 轉換資料型別
df1 = df.select(df['sale_date'],df['sale_amt'].cast("Double"))
# df 資料型別:DataFrame[sale_date: string, sale_amt: string]
# df1 資料型別:DataFrame[sale_date: string, sale_amt: double]

# 將 "sale_amt" 列,值為 22.0 替換成 200.0
df1.replace(22.0,200.0,'sale_amt')

# 去掉資料項前後的空格
# 如果所有的資料項前後都沒有空格使用show()是看不出效果的,show() 的前面可能為了排版的需要有很多空格。
# 這個例子中第一行前後是有空格的,這樣就很明顯

In [3]: from pyspark.sql.functions import *
In [23]: df.select(trim(df.sale_date)).show()
+---------------+
|trim(sale_date)|
+---------------+
|      2015/2/28|
|      2015/2/27|
|      2015/2/26|
|      2015/2/25|
|      2015/2/24|
|      2015/2/23|
|      2015/2/22|
|      2015/2/21|
|      2015/2/20|
|      2015/2/19|
|      2015/2/18|
|      2015/2/16|
|      2015/2/15|
|      2015/2/14|
|      2015/2/13|
|      2015/2/12|
|      2015/2/11|
|      2015/2/10|
|       2015/2/9|
|       2015/2/8|
+---------------+
only showing top 20 rows


In [24]: df.select(df.sale_date).show()
+-------------+
|    sale_date|
+-------------+
|  2015/2/28  |
|    2015/2/27|
|    2015/2/26|
|    2015/2/25|
|    2015/2/24|
|    2015/2/23|
|    2015/2/22|
|    2015/2/21|
|    2015/2/20|
|    2015/2/19|
|    2015/2/18|
|    2015/2/16|
|    2015/2/15|
|    2015/2/14|
|    2015/2/13|
|    2015/2/12|
|    2015/2/11|
|    2015/2/10|
|     2015/2/9|
|     2015/2/8|
+-------------+
only showing top 20 rows

# 只保留年份

In [27]: df.select(substring(trim(df.sale_date),1,4).alias('year'),df.sale_amt).show()
+----+--------+
|year|sale_amt|
+----+--------+
|2015|  2618.2|
|2015|  2608.4|
|2015|  2651.9|
|2015|  3442.1|
|2015|  3393.1|
|2015|  3136.6|
|2015|  3744.1|
|2015|  6607.4|
|2015|  2060.3|
|2015|  3614.7|
|2015|  3295.5|
|2015|  2332.1|
|2015|  2699.3|
|2015|    null|
|2015|  3036.8|
|2015|     865|
|2015|  3014.3|
|2015|  2742.8|
|2015|  2173.5|
|2015|  3161.8|
+----+--------+
only showing top 20 rows
# substring(str,pos,len) 返回 str 子串,從 pos 位置(包含) 返回 len 長度的子串
  • 糾正錯誤資料
  • 刪除重複資料
  • 刪除唯一性屬性
  • 除去不相關欄位或特徵
  • 處理不一致資料等

資料變換

  • 規範化
  • 離散化
  • 衍生指標
  • 類別特徵數值化
  • 平滑噪音
資料預處理 演算法 功能簡介
特徵抽取 TF-IDF 統計文件的詞頻–> 逆向檔案評率(TF-IDF)
特徵抽取 Word2Vec 將文件轉換成向量
特徵裝換 Tokeniziation Tockenization 將文字劃分為獨立的個體
特徵裝換 StopWordsRemover 刪除所有停用詞
特徵裝換 PCA 使用 PCA 可以對變數集合進行降維
特徵裝換 StringIndexer StringIndexer 將字串列編碼為標籤索引列
特徵裝換 OneHotEncoder 將標籤指標對映為0/1 的向量
特徵裝換 Normalizer 規範每個向量以具有單位範數
特徵裝換 StandardScaler 標準化每個特徵使得其有統一的標準差以及(或者)均值為0,方差為1
特徵裝換 VectorAssembler 將給定的多列表組合成一個單一的向量列
# 定義特徵向量
featuresArray = ['season','yr','mnth','hr','holiday','weekday','workingda','weathersit','temp','atemp','hum','windspeed']

# 把各特徵組合成特徵向量 features 
assembler = VectorAssembler(inputCols=featuresArray,outputCol='features')

# 選擇貢獻度較大的前 5 個特徵

selectorfeature = ChisSqSelector(numTopFeatures=5,featuresCol="features",outputCol="selectedFeatures",labelCol='label')

資料整合

  • 資料整合:將多個檔案或者多資料庫中的資料進行合併,然後存放在一個一致的資料儲存中
  • 資料整合一般通過 join,union,merge 等關鍵字將兩個或者多個數據集連線在一起。Spark SQL(包括 DataFrame) 有join,pandas 下有 merge方法
  • 資料整合往往需要耗費很多資源,尤其是大資料間的整合涉及shuffle過程,有時候需要牽涉多個節點,所以資料整合一般要考慮資料一致性的問題和效能問題
  • 傳統的資料庫一般是在單機上採用 hash join 方法,分散式環境中,採用 join時,可以考慮充分利用分散式資源進行平行化(也就是提高併發度,可以通過增加分割槽數來實現),當然在 join 之前,對資料過濾或歸約也是常用的優化方法
  • Spark SQL 3種 join 方法
    • broadcast hash join:如果 join 表中有一張大表和一張較小的表,可以考慮把小表廣播分發到大表所在的分割槽節點上,分別併發第與其上的分割槽記錄進行 hash join
    • shuffle hash join:如果兩個表都不小,對資料量較大的表進行廣播分佈就不太合適,這種情況可以根據 join key 相同分割槽也相同的原理,將兩個表分別按照 join key 進行重新組織分割槽,這樣就可以將 join 分而治之,劃分為很多小的 join,充分利用叢集資源並行化
    • sort merge join:對資料量較大的表可以考慮使用 sort merge join 方法,先將兩張大表根據 join key 進行重新分割槽,兩張表資料會分佈到整個叢集,以便分散式並行處理,然後,對單個分割槽節點的兩表資料分佈進行排序,最後,對排好序的兩種分割槽表執行 join操作
  • DataFrame 中的 join(或 merge) 方式:內連線,左連線,右連線

資料歸約

資料歸約:刪除或減少資料的冗餘性(降維就是資料歸約其中的一種技術)、精簡資料集等,使得歸約後資料比原資料小,甚至小很多,但仍然接近於保持原資料的完整性,且結果與歸約前後結果相同或幾乎相同

資料預處理 演算法 功能簡介
特徵選擇或降維 VectorSlicer 得到一個新的原始特徵子集的特徵向量
特徵選擇或降維 RFormula 通過 R 模型公式來將資料中的欄位轉換為特徵值
特徵選擇或降維 PCA 使用 PCA 方法可以對變數集合進行降維
特徵選擇或降維 SVD
特徵選擇或降維 ChiSqSelector 根據分類的卡方獨立性檢驗來對特徵排序,選取類別標籤主要依賴的特徵
  • SVD,PCA example
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.{SparkContext,SparkConf}
object chooseFeatures {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("dataReduce").setMaster("local")
    val sc = new SparkContext(conf)
    val data = Array(Vectors.dense(1,2,3,4,5,6,7,8,9),
      Vectors.dense(5,6,7,8,9,0,8,6,7),
      Vectors.dense(9,0,8,7,1,4,3,2,1),
      Vectors.dense(6,4,2,1,3,4,2,1,5),
      Vectors.dense(4,5,7,1,4,0,2,1,8))

    val dataRDD = sc.parallelize(data,2)
    val mat : RowMatrix = new RowMatrix(dataRDD)
    val svd  = mat.computeSVD(3,computeU = true)
    val U:RowMatrix = svd.U
    val s:Vector = svd.s
    val V:Matrix = svd.V
    println("U: " ,U)
    println("V: " ,V)
    println("s: " ,s)

    val pc:Matrix = mat.computePrincipalComponents(3)
    println("pc: ",pc)

/*
output:
(U: ,[email protected])
(V: ,-0.33309047675110115  0.6307611082680837    0.10881297540284612   
-0.252559026169606    -0.13320654554805747  0.4862541277385016    
-0.3913180354223819   0.3985110846022322    0.20656596253983592   
-0.33266751598925126  0.25621153877501424   -0.3575093420454635   
-0.35120996186827147  -0.24679309180949208  0.16775460006130793   
-0.1811460330545444   0.03808707142157401   -0.46853660508460787  
-0.35275045425261     -0.19100365291846758  -0.26646095393100677  
-0.2938422406906167   -0.30376401501983874  -0.4274842789454556   
-0.44105410502598985  -0.4108875465911952   0.2825275707788212    )
(s: ,[30.88197557931219,10.848035248251415,8.201924156089822])
(pc: ,-0.3948204553820511   -0.3255749878678745   0.1057375753926894    
0.1967741975874508    0.12066915005125914   0.4698636365472036    
-0.09206257474269655  -0.407047128194367    0.3210095555021759    
0.12315980051885281   -0.6783914405694824   -0.10049065563002131  
0.43871546256175087   -0.12704705411702932  0.2775911848440697    
-0.05209780173017968  0.10583033338605327   -0.6473697692806737   
0.422474587406277     -0.27600606797384     -0.13909137208338707  
0.46536643478632944   -0.172268807944553    -0.349731653791416    
0.4376262507870099    0.3469015236606571    0.13076351966313637   )
*/

構建模型

演算法選擇的原則:
- 業務需求、資料特徵、演算法適應性。個人經驗等
- 選擇幾種演算法進行比較
- 採用整合學習的方式,複合多種演算法也是選項之一。如:先採用聚類方法對資料進行聚類,然後對不同的類別資料進行預測和推薦
- 從簡單和熟悉的演算法入手,然後不斷的完善和優化

spark ML 目前支援的演算法

型別 spark ML 目前支援的演算法
分類 邏輯迴歸,分二項邏輯迴歸 (Binomial logistic regression) 和多項邏輯迴歸(Multinomial logistic regression)
分類 決策樹分類 (Decision tree classifier)
分類 隨機森林分類 (Random forest classifier)
分類 梯度提升決策樹分類 (Gradient-boosted tree classifier)
分類 多層感知機分類 (Multilayer perceptron classifier)
分類 一對多分類 (One-vs-Rest classifier)
分類 樸素貝葉斯 (native Bayes)
迴歸 線性迴歸 (Linear regression)
迴歸 廣義線性迴歸 (Generalized liner regression)
迴歸 決策樹迴歸 (Decision tree regression)
迴歸 隨機森林迴歸 (Random forest regression)
迴歸 梯度提升決策樹迴歸 (Gradient-boosted tree regression)
迴歸 生存迴歸 (Survival regression)
迴歸 保序迴歸 (Isotonic regression)
推薦 協同過濾 (Collaborative filtering)
聚類 K-均值(k-means)
聚類 高斯混合模型 (Gaussian Mixture Model)
聚類 主題模型 (latent Dirichlet allocation LDA)
聚類 二分 K 均值 (bisecting k-means)
  • 演算法確定了還需要設定一些引數,如訓練決策樹的時候需要選擇迭代次數、純度計算方法、樹的最大高度等
  • 資料劃分為訓練資料和測試資料,訓練資料用來訓練模型,測試資料用來驗證模型,這種驗證方式屬於交叉驗證(CrossValidator CV)
  • K-CV (K-fold Cross Validator) K 折交叉驗證,不重複地隨機將資料劃分為 k 份,如 K = 3,則將產生 3 個(訓練/測試) 資料集對,每個資料集使用 2/3 的資料進行訓練,1/3 的資料進行測試。這樣就會得到3個模型,用這 3 個模型的平均數作為最終模型的效能指標。K-CV 可以有效的避免欠學習狀態的發生,其結果比較有說服力
  • spark 提供了多種資料劃分的方法:randomSplit、CrossValidator等

模型評估方法

  • 對模型的效能、與目標的切合度等進行評估
  • 評估指標:精確度,ROC,RMSE等,這些指標是重要而基礎的,但不是唯一和最終指標,除了這些指標,我們還應該評估模型對業務的提示或商業目標的達成等方面的貢獻
  • spark 評估演算法:
    • 均方差 (MSE,Mean Squared Error)
    • 均方差更 (RMSE Root Mean Squared Error)
    • 平均絕對值誤差 (MAE,Mean Absolue Error)

評估公式

  • 混淆矩陣(confusion matrix):簡單的矩陣,用於展示一個二分類器的預測結果,其中 T 為 True,F 為 False、N 為 Negative(負樣本)、P 為 Postitive(正樣本)
    • 真正(TP):被模型預測為正的正樣本數(本身是正樣本,模型預測也是正樣本),可以稱作判斷為真的正確率
    • 真負(TN):被模型預測為負的負樣本數(本身是負樣本,模型預測也是負樣本),可以稱作判斷為假的正確率
    • 假正(FP):被模型預測為正的負樣本數,(本身是負樣本,模型預測是正樣本),可以稱作誤報率
    • 假負(FN):被模型預測為負的正樣本數,(本身是正樣本,模型預測是負樣本),可以稱作為漏報率

混淆矩陣

  • 評估指標:
    • 準確率(Precision):反映了被分類器判定的正例中真正的正例樣本的比重 P = TP/(TP+FP)
    • 錯誤率(Error):模型預測錯誤佔整個正負樣本的比重。E = (FP+FN) / (P+N) = (FP+FN) / (FP+FN+TP+TN)
    • 正確率(Accuracy):模型預測正確佔整個正負樣本的比重。 A = (TP+TN) / (P+N) = (TP+TN+FN+FP)
    • 召回率(Recall):反映了被正確判定的正例佔總正例的的比重 R = (TP) / (TP + FN)
    • A + E = 1
    • F1-Measure f-measure是一種統計量,F-Measure又稱為F-Score,F-Measure是Precision(準確率)和Recall(召回率)加權調和平均
    • 真陽率(TPR):代表分類器預測的正類中實際正例項佔所有正例項的比重。TPR = TP/(TP+FN)
    • 假陽率(FPR):程式碼分類器預測的正類中實際負例項佔所有負例項的比重。FRP = FP/(FP+TN)

F1-Measure

package spark.mllib

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.{
  BinaryClassificationEvaluator,
  MulticlassClassificationEvaluator
}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}
object modelAssess {
  def main(args: Array[String]): Unit = {
    val path = args(0)
    // sparkSession 是 Spark SQL 的入口,資料結構是 DataFrame。sc Spark CORE 的入口,資料結構是 RDD,從 Spark 2.0 及以後應儘量使用 DataFrame和DataSet

    val spark: SparkSession = SparkSession.builder
      .appName("modelAsess")
      .master("yarn")
      .config("spark.testing.memory", "471859200")
      .getOrCreate
    // DataFrame.read return DataFrameReader。read 預設支援的格式 parquet,hive 支援的列式儲存的檔案格式
    // format(source: String) 指定源資料的格式
    // load(path: String) 載入資料
    // data : dataFrame
    val data = spark.read.format("libsvm").load(path)
    // randomSplit(weight: Array[Double],seed : Long)
    // 按照 Double 型別的陣列提供的權重值來隨機切分資料,seed 可以理解為新增的雜誌,增加隨機性
    val Array(trainingData, testData) =
      data.randomSplit(Array(0.7, 0.3), seed = 1234L)
    // LogisticRegression 是一個類
    //.setThreshold(value : Double) 二分類中,設定閥值,概率大於該值的預測為1,概率小於該值預測為0。預設值:0.5.如果此值過大,那麼很多標籤都會被預測為0,如果此值過小,很多標籤被預測為1
    // setMaxIter(value: Int) 設定最大迭代次數,預設是100
    // setRegParam(value : Double) 設定正則化引數,預設是 0.0
    // setElasticNetParam(value: Double) value = 0.0 使用 L2 正則化,如果value = 1.0 使用 L1 正則化,如果 0.0 < value < 1.0 使用 L1 和 L2 組合的正則化。注意如果是 fit 優化只支援 L2 正則化
    // 正則化主要是對權重比較大的特徵進行懲罰,避免過度依賴某個特徵造成過擬合
    val lr = new LogisticRegression()
      .setThreshold(0.6)
      .setMaxIter(10)
      .setRegParam(0.3)
      .setElasticNetParam(0.8)
    // fit(dataset: DataSet) 使用這個構建模型,輸入資料是訓練資料
    val lrMode = lr.fit(trainingData)
    // transform(dataSet: DataSet) : DataFrame  將測試資料輸入開始預測
    val predictions = lrMode.transform(testData)
    predictions.show()

    // BinaryClassificationEvaluator 這個類是用來評估二分類演算法構建的模型的預測效果,有兩個期望輸入列:label 標籤列和 rawPrediction
    // setLabelCol 設定標籤列的列名
    val evaluator = new BinaryClassificationEvaluator()
      .setLabelCol("label")

    // evaluate(dataSet: DataSet) : Double  評估模型的預測結果,返回一個度量值
    // def isLargerBetter: Boolean true:評估返回的指標應最大化,false:評估返回的值應最小化
    val accuracy = evaluator.evaluate(predictions)

    // RegressionMertrics 這個類是用來評估迴歸模型
    // new RegressionMetrics(predictionAndObservations: RDD[(Double, Double)])
    /* 
    val dataFrame1 = predictions.select("prediction","label")
    dataFrame1 : org.apache.spark.sql.DataFrame
    val rdd1 = dataFram1.rdd
    rdd1 : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
    rdd1.rdd.map(x =>(x(0).asInstanceOf[Double],x(1).asInstanceOf[Double]))
    return org.apache.spark.rdd.RDD[(Double, Double)]

    x :org.apache.spark.sql.Row = [0.0,0.0]
    x(0) : Any = 0.0
    x(1) : Any = 0.0
    x(0).asInstanceOf[Double] 將傳入的物件轉換成 Double 型別,這裡就是將 Any 轉換成 Double 型別
    */
    val rm2 = new RegressionMetrics(
      predictions
        .select("prediction", "label")
        .rdd
        .map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

    /*
    def meanSquaredError: Double
    def meanAbsoluteError: Double
    def rootMeanSquaredError: Double
    */
    println("MSE: ", rm2.meanSquaredError)
    println("MAE: ", rm2.meanAbsoluteError)
    println("RMSE Squared: ", rm2.rootMeanSquaredError)

    val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
    val multiclassClassificationEvaluator: MulticlassClassificationEvaluator =
      new MulticlassClassificationEvaluator()

    printlnMetricMulti("f1", predictions, multiclassClassificationEvaluator)
    printlnMetricMulti("weightedPrecision",
                       predictions,
                       multiclassClassificationEvaluator)
    printlnMetricMulti("weightedRecall",
                       predictions,
                       multiclassClassificationEvaluator)
    printlnMetricMulti("accuracy",
                       predictions,
                       multiclassClassificationEvaluator)

    printlnMetricbinary("areaUnderROC",
                        binaryClassificationEvaluator,
                        predictions)
    printlnMetricbinary("areaUnderPR",
                        binaryClassificationEvaluator,
                        predictions)


    // A error of "value $ is not StringContext member" is reported if you don't add following line
    import spark.implicits._

    // 計算 TP,分類正確且分類為1的樣本數
    println(
      predictions
        .filter($"label" === $"prediction")
        .filter($"label" === 1)
        .count)
    // 計算 TN,分類正確且分類為0的樣本數
    println(
      predictions
        .filter($"label" === $"prediction")
        .filter($"prediction" === 0)
        .count)

    // 計算 FN,分類錯誤且分類為0的樣本數
    println(
      predictions
        .filter($"label" !== $"prediction")
        .filter($"prediction" === 0)
        .count)

    // 計算 FP,分類錯誤且分類為1的樣本數
    println(
      predictions
        .filter($"label" !== $"prediction")
        .filter($"prediction" === 1)
        .count)

    /*
    準確率: TP(TP+FP) = 17/(17+0) = 1
    召回率: TP(TP+FN) = 17/(17+1) = 0.944444
    */
  }

  // 計算準確率、召回率、正確率、F1
  def printlnMetricMulti(
      metricsName: String,
      predictions: DataFrame,
      multiclassClassificationEvaluatdor: MulticlassClassificationEvaluator)
    : Unit = {
    /*
    val multiclassClassificationEvaluatdor = new MulticlassClassificationEvaluator()
    setMetricName(metricName : String) : BinaryClassificationEvaluator.this.type   設定評估指標的名字
    evaluate(predictions) 這個同上返回一個 Double的值,該指標具體的值
    */

    println(
      metricsName + " = " + multiclassClassificationEvaluatdor
        .setMetricName(metricsName)
        .evaluate(predictions))
  }


  // 計算 AUC(area under ROC ROC 曲線下的區域的面積),area under PR PR曲線的面積
  def printlnMetricbinary(
      metricsName: String,
      binaryClassificationEvaluator: BinaryClassificationEvaluator,
      predictions: DataFrame): Unit = {

    println(
      metricsName + " = " + binaryClassificationEvaluator
        .setMetricName(metricsName)
        .evaluate(predictions))
  }
}

/*
程式碼叢集方式提交
#!/bin/bash
cd /data/project/spark/spark_workstation
/data/sbt/bin/sbt compile && /data/sbt/bin/sbt package && \
/data/spark/bin/spark-submit --master yarn \
    --deploy-mode cluster \
    --verbose \
    --num-executors 2 \
    --executor-memory '1024m' \
    --executor-cores 1 \
    --class spark.mllib.modelAssess ./target/scala-2.11/mergedata_2.11-2.2.1.jar \
    hdfs://master:9000/sparkMLlib/sample_libsvm_data.txt
*/

組裝

將資料的清洗、轉換等資料的預處理工作,以及構建模型和評估模型這些任務當做 spark Pipeline的 stage,這樣既可以保證各任務之間有序執行,也保證的處理資料的資料的一致性

// 建立 Pipeline,將各個 Stage 依次組裝在一起
val pipeline = new Pipeline().setStages(Array(tokenizer,hashingTF,lr))

// 在訓練集上擬合這個 Pipeline
val model = pipeline.fit(training)

// 在測試集上做預測

model.transform(test).select("label","prediction")

模型選擇或調優

  • 調優:使用給定的資料為給定的任務尋找最適合的模型或引數,調優可以是對單個階段進行除錯,也可以一次性對整個Pipeline 進行調優

  • MLlib 支援使用型別CorssValidator 和 TrainValidationSplit 這樣的工具進行模型選擇,這類工具有一下元件

    • Estimator:使用者調優的演算法或者Pipeline
    • ParamMap 集合:提供引數的選擇,有時又稱使用者查詢的引數網格(parameter grid),引數網格可以使用 ParamGridBuilder 來構建
    • Evaluator:衡量模型在測試資料上的擬合程度
  • 模型選擇工具工作原理如下:

    • 對輸入資料劃分為訓練資料和測試資料
    • 對於每個(訓練/測試)對,遍歷一組ParamMaps。用每一個 ParamMap 引數來擬合估計器,得到訓練後的模型,再使用評估器來評估模型的表現
    • 選擇效能表現最優的模型對應引數表

交叉驗證(CrossValidator):

  • 交叉驗證:將資料切分成 K 折資料集合,分別用於訓練資料和測試資料
  • 如果 K = 3 就會有3份 訓練/測試資料對。每一份資料對,其中訓練資料佔 2/3,測試資料佔 1/3,為了評估一個 ParamMap,CrossValidator 會計算這 3 個不同的(訓練,測試) 資料集對在 Estimator 擬合出的模型上平均評估指標
  • 在找出最好的 ParamMap後,CrossValidator 會利用此 ParamMap 在整個訓練機上訓練(fit) 出一個泛華能力強、誤差相對小的最佳模型,整個過程處於流程化管理之中,工作流程圖如下

image

交叉驗證的缺點:雖然利用 CrossValidator 來訓練模型可以提升泛華能力,但其代價比較高。如果 K =3 regParam = (0.1,0.01)、numiters = (10,20) 這樣就需要對模型訓練 3*2*2 = 12 次。然而對比啟發式的手動調優,這是選擇引數的行之有效的方法

訓練驗證切分 (TrainValidationSplit)

  • TrainValidation 建立單一的(訓練、測試)資料集對,它適用 trainRatio 引數將資料集切分成兩部分。並最終使用最好的 ParamMap 和 完整的資料集來擬合評估器
  • 例如: trainTatio = 0.8 TrainValidationSplit 80% 作為訓練資料集,20%作為測試資料集
  • TrainValidation 優點就是隻對每對引數組合評估1次,因此效能比較好,但是當訓練資料集不夠大的時候其結果相對不可信

儲存模型

  • 儲存擬合後的流水線到磁碟上
model.write.overwrite().save("/tmp/spark-logistic-regreesion-model")
  • 儲存未擬合的流水線到磁碟上
pipeline.write.overwrite().save("/tmp/spark-logistic-regression-model1")
  • 把擬合後的流水線部署到其他環境中
val sameMode = PiplelineModel.load("/tmp/spark-logistic-regreesion-model")