spark MLlib 之構建機器學習系統
構建 spark 機器學習系統
spark 機器學習系統架構
spark 和 hadoop 叢集的安裝
請參照下面的連結
spark-shell
- 1.x Spark-Shell 自動建立一個 SparkContext 物件 sc
- 2.x Spark-Shell 引入了 SparkSession 物件(spark),執行 Spark-Shell 會自動建立一個 SparkSession 物件,在輸入 spark、SparkContext、SQLContext 都已經封裝在 SparkSession 物件當中,它為使用者提供了統一的的切入點,同時提供了各種DataFrame 和 DataSet 的API。
載入資料
- 原始資料。每個欄位以 “,” 分割,源資料本來是以 “|” 分割的,但是實際處理中發現欄位不能是Int(scala StructField 程式碼段,欄位型別為IntegerType) 型別,否則程式會報錯,這個有待進一步測試
列出3行樣例資料
useid | age | gender | occupation | zipcode |
---|---|---|---|---|
1 | 24 | M | technician | 85711 |
2 | 53 | F | other | 94043 |
3 | 23 | M | writer | 32067 |
- 啟動 pyspark
// python code
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.appName("python spark SQL basic example").getOrCreate()
sc = spark.sparkContext
userrdd = sc.textFile("hdfs://master:9000/u01/bigdata/user_test.txt").map(lambda line : line.split(","))
df = userrdd.map(lambda fields: Row(userid = fields[0],age = int(fields[1]),gender = fields[2],occupation = fields[3],zipcode = fields[4]))
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")
schemauser.describe("userid","age","gender","occupation","zipcode").show()
// spark-submit
/data/spark/bin/spark-submit --master yarn \
--deploy-mode cluster \
--num-executors 2 \
--executor-memory '1024m' \
--executor-cores 1 \
/data/project/spark/python/load.py
// 注意如果有錯誤,將錯誤解決了,還是報同樣的錯誤,那基本上可以肯定是快取在作怪,需要刪除當前目錄下 "metastore_db" 這個目錄。python 程式碼還可以直接提交 spark-submit load.py 只是這種方式使用的 spark standalone 叢集管理模式
// scala code
package spark.mllib
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import scala.collection.mutable
import org.apache.spark.{SparkConf,SparkContext}
object loadData {
def main(args: Array[String]) {
val userSchema: StructType = StructType(mutable.ArraySeq(
StructField("userid",IntegerType,nullable = false),
StructField("age",IntegerType,nullable = false),
StructField("gender",StringType,nullable = false),
StructField("occupation",StringType,nullable = false),
StructField("zipcode",StringType,nullable = false)
))
val conf = new SparkConf().setAppName("load data")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val userData = sc.textFile("hdfs://master:9000/u01/bigdata/user_test.txt").map {
lines =>
val line = lines.split(",")
Row(line(0).toInt,line(1)toInt,line(2),line(3),line(4))
}
val userTable = sqlContext.createDataFrame(userData,userSchema)
userTable.registerTempTable("user")
userTable.describe("userid","age","gender","occupation","zipcode").show
sqlContext.sql("SELECT max(userid) as useridMax FROM user").show()
}
}
// spark-submit
/data/spark/bin/spark-submit --master yarn \
--deploy-mode cluster \
--num-executors 2 \
--executor-memory '1024m' \
--executor-cores 1 \
--class spark.mllib.loadData ./target/scala-2.11/mergefile_2.11-2.2.1.jar
關於 getOrCreate() 這個函式的使用方法詳情見下面的連結
探索資料
資料統計資訊
userTable.describe(“userid”,”age”,”gender”,”occupation”,”zipcode”).show
這一行程式碼會輸出一個表格
summary | userid | age | gender | occupation | zipcode |
---|---|---|---|---|---|
count | 943 | 943 | 943 943 | 943 | 943 |
mean | 472.0 | 34.05196182396607 | null | null | 50868.78810810811 |
stddev | 272.3649512449549 | 12.19273973305903 | null | null | 30891.373254138158 |
min | 1 | 7 | F | administrator | 00000 |
max | 99 | 73 | M | writer | Y1A6B |
資料質量分析
- count 統計資料的總量,有多少條記錄
- 總數
- 非空記錄
- 空值 = 總數 - 非空記錄
- mean 計算平均值
- std 標準差
- max 最大值
- min 最小值
- 百分位數:百分位數則是對應於百分位的實際數值。例如:將一個組數按小到大的順序排序,25%=3.2,表示這組數中有25%的數是小於或等於3.2,75%的數是大於或等於3.2的。至於計算方式每個演算法的計算方式不一樣,pandas的計算方式沒有找到。
- 25%
- 50%
- 75%
# linux伺服器匯入python matplotlib.pyplot報錯
import matplotlib as mpl
mpl.use('Agg')
# 再執行
import matplotlib.pyplot as plt
# 需要儲存圖片到指定的目錄
plt.savefig("/home/yourname/test.jpg")
# 即使是能夠保持圖片,但是圖片傳到Windows平臺開啟是空白的,畫圖看來最好還是要在Windows平臺上執行
# 完整的程式碼,在Windows 平臺上執行
# coding:utf-8
import pandas as pd
import matplotlib.pyplot as plt
df = pd.read_csv("C:\\Users\\ljb\\Desktop\\catering_sale.csv")
print(df.count())
print(df.describe())
plt.figure()
bp = df.boxplot(return_type="dict")
# fliers 為異常資料標籤
x = bp[ 'fliers' ][0].get_xdata()
y = bp[ 'fliers'][0].get_ydata()
y.sort()
print("x: ",x)
print("y: ",y)
# 用 annotate 添加註釋
for i in range(len(x)):
plt.annotate(y[i],xy=(x[i],y[i]),xytext=(x[i] + 0.1 - 0.8/(y[i] - y[i-1]),y[i]))
plt.show()
/*
sale_date 200
sale_amt 198 從這裡可以看出有2個空白的值
dtype: int64
sale_amt
count 198.000000
mean 2765.545152
std 709.557639
min 22.000000
25% 2452.725000
50% 2655.850000
75% 3023.500000
max 9106.440000
x: [ 1. 1. 1. 1. 1. 1.]
y: [ 22. 865. 1060. 4065.2 6607.4 9106.44]
*/
從上圖可以看出有 6 個可能的異常值都在圖上表現出來,但是具體是否異常,需要和銷售人員確認
資料特徵分析
特徵:用於模型訓練的變數。可以看做資料的屬性。
1. 如果這些資料是記錄人,那特徵就是年齡,性別,籍貫,收入等。
2. 如果這些資料記錄的是某個商品,那特徵就是商品類別,價格,產地,生成日期,銷售數量等
- 特徵分佈分析與相關性分析
- 有助於發現相關資料的分佈特徵、分佈型別、分佈是否鄧超等,可以使用視覺化方法,這樣便於直觀發現特徵的異常值
- 對比分析
- 統計量分析
特徵資料分析,例子
# coding:utf-8
from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
import sys
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
sc = spark.sparkContext
userrdd = sc.textFile("C:\\Users\\ljb\\Desktop\\user_test.txt").map(lambda line:line.split(","))
df = userrdd.map(lambda fields: Row(userid = fields[0],age = int(fields[1]),gender = fields[2],
occupation = fields[3],zipcode = fields[4]))
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")
age = spark.sql("SELECT * FROM user")
ages = age.rdd.map(lambda p:p.age).collect()
# 分析"年齡",這個特徵,通過直方圖的形式顯示出來
plt.hist(ages,bins=10,color="lightblue",normed=True)
plt.show()
#hist 對這個函式做簡要的說明
# 先看下原始碼:
def hist(x, bins=None, range=None, normed=False, weights=None, cumulative=False,
bottom=None, histtype='bar', align='mid', orientation='vertical',
rwidth=None, log=False, color=None, label=None, stacked=False,
hold=None, data=None, **kwargs):
ax = gca()
# Deprecated: allow callers to override the hold state
# by passing hold=True|False
washold = ax._hold
if hold is not None:
ax._hold = hold
from matplotlib.cbook import mplDeprecation
warnings.warn("The 'hold' keyword argument is deprecated since 2.0.",
mplDeprecation)
try:
ret = ax.hist(x, bins=bins, range=range, normed=normed,
weights=weights, cumulative=cumulative, bottom=bottom,
histtype=histtype, align=align, orientation=orientation,
rwidth=rwidth, log=log, color=color, label=label,
stacked=stacked, data=data, **kwargs)
finally:
ax._hold = washold
return ret
/*
返回的值是一個tuple(n.bins,patches) or ([n0, n1, ...], bins, [patches0, patches1,...]) (輸入的資料是多重資料)
引數解釋:
x: 一個數組,主要對這個陣列中的資料畫圖,可以是多維陣列。
bins:總共有幾個條狀,預設是10
color:表示直方圖的顏色
*/
我們還可以進一步分析使用者職業分佈特徵
# coding:utf-8
from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
import sys
import numpy as np
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
sc = spark.sparkContext
userrdd = sc.textFile("C:\\Users\\ljb\\Desktop\\user_test.txt").map(lambda line:line.split(","))
df = userrdd.map(lambda fields: Row(userid = fields[0],age = int(fields[1]),gender = fields[2],
occupation = fields[3],zipcode = fields[4]))
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")
# 查詢 occupation(職業),並按其分組,然後統計每個職業出現的次數,最後以職業出現的次數進行排序升序
count_occp = spark.sql("SELECT occupation,count(occupation) as cnt FROM user GROUP BY occupation ORDER BY cnt")
count_occp.show(21) # 顯示前21行資料
# 獲取職業名稱及對應出現的次數,以便畫出各職業總數圖
# 把執行的結果轉換成 RDD
x_axis = count_occp.rdd.map(lambda p:p.occupation).collect()
y_axis = count_occp.rdd.map(lambda p:p.cnt).collect()
pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes()
# 設定 x 的刻度
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)
plt.bar(pos,y_axis,width,color="orange")
plt.xticks(rotation=30)
fig = plt.gcf()
fig.set_size_inches(16,10)
plt.show()
對比分析的例子
# coding:utf-8
import pandas as pd
import matplotlib.pyplot as plt
df = pd.read_csv("C:\\Users\\ljb\\Desktop\\catering_sale.csv",header=0,index_col='sale_date',parse_dates=True)
df1 = df.fillna(0)
df_ym = df1.resample("M",how="sum")
df2 = df_ym.to_period("M")
df2.plot(kind="bar",rot=30)
plt.show()
資料視覺化
- 通過資料視覺化可以幫助我們發現數據的異常值、特徵的分佈情況等,為資料預處理提供重要的支援
- spark 對資料的視覺化功能還很弱,這裡需要使用python 或者 R
- Python 視覺化可以使用 matplotlib 和 plot
- 下面使用 sin(x),cos(x) 這兩個函式分佈介紹 matplotlib 和 plot 這兩種python 資料視覺化的方法
# matplotlib 視覺化方法
# coding:utf-8
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
# 畫圖中能夠顯示中文
plt.rcParams[ 'font.sans-serif' ] = [ 'SimHei' ]
# 防止座標軸上 "-" 號變成方塊
plt.rcParams[ 'axes.unicode_minus' ] = False
# np.linspace(args1,args2,args3) 生成等差數列 args1:起始值,從什麼數開始,args2:結束值,args3:生成多個數
x = np.linspace(0,10,100)
y = np.sin(x)
y1 = np.cos(x)
# 畫布的長度是10,寬度是6
plt.figure(figsize=(10,6))
# label 標籤在圖上顯示兩個 $$ 括起來的部分:sin(x),線的顏色是紅色,線的寬度是2
plt.plot(x,y,label="$sin(x)$",color="red",linewidth=2)
# "b--" 表示藍色虛線
plt.plot(x,y1,"b--",label="$cos(x^2)$")
# 設定 X 軸標籤
plt.xlabel(u"X 值")
# 設定 Y 軸標籤
plt.ylabel(u"Y 值")
# 設定影象的標題
plt.title(u"三角函式影象")
# y 軸最大值:1.2和最小值:-1.2,這個沒有多大意義,三角函式取值範圍 -1 <= y <= 1
plt.ylim(-1.2,1.2)
# 顯示圖例,就是左上角會有紅色線條表示 sin(x),藍色虛線表示 cos(x^2)
plt.legend()
# 將圖片儲存至當前目錄下
plt.savefig("fig01.jpg")
# 顯示圖片
plt.show()
# plot 視覺化
# coding:utf-8
from pandas import DataFrame
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
x = np.linspace(0,10,100)
df = DataFrame({'sin(x)':np.sin(x),'cos(x)':np.cos(x)},index=x)
df.plot()
plt.show()
資料預處理
資料清理
- 填補缺失資料
# coding:utf-8
import pandas as pd
import lxml
df = pd.read_csv("C:\\Users\\ljb\\Desktop\\catering_sale.csv",header=0)
# 顯示缺失的資料
print(df[df.isnull().values == True])
# 使用 0 填補空值
print(df.fillna(0))
# 使用該列的平均值填補空值
print(df["sale_amt"].fillna(df["sale_amt"].count()))
# 使用該列的前一行值填補空值
print(df.fillna(method="pad"))
- 光滑噪聲資料。有兩類處理方法
- 分箱
- 聚類
- 處理奇異資料
# 進入 Pyspark,讀取資料
df = spark.read.csv("/sparkMLlib/catering_sale.csv",header=True)
# 轉換資料型別
df1 = df.select(df['sale_date'],df['sale_amt'].cast("Double"))
# df 資料型別:DataFrame[sale_date: string, sale_amt: string]
# df1 資料型別:DataFrame[sale_date: string, sale_amt: double]
# 將 "sale_amt" 列,值為 22.0 替換成 200.0
df1.replace(22.0,200.0,'sale_amt')
# 去掉資料項前後的空格
# 如果所有的資料項前後都沒有空格使用show()是看不出效果的,show() 的前面可能為了排版的需要有很多空格。
# 這個例子中第一行前後是有空格的,這樣就很明顯
In [3]: from pyspark.sql.functions import *
In [23]: df.select(trim(df.sale_date)).show()
+---------------+
|trim(sale_date)|
+---------------+
| 2015/2/28|
| 2015/2/27|
| 2015/2/26|
| 2015/2/25|
| 2015/2/24|
| 2015/2/23|
| 2015/2/22|
| 2015/2/21|
| 2015/2/20|
| 2015/2/19|
| 2015/2/18|
| 2015/2/16|
| 2015/2/15|
| 2015/2/14|
| 2015/2/13|
| 2015/2/12|
| 2015/2/11|
| 2015/2/10|
| 2015/2/9|
| 2015/2/8|
+---------------+
only showing top 20 rows
In [24]: df.select(df.sale_date).show()
+-------------+
| sale_date|
+-------------+
| 2015/2/28 |
| 2015/2/27|
| 2015/2/26|
| 2015/2/25|
| 2015/2/24|
| 2015/2/23|
| 2015/2/22|
| 2015/2/21|
| 2015/2/20|
| 2015/2/19|
| 2015/2/18|
| 2015/2/16|
| 2015/2/15|
| 2015/2/14|
| 2015/2/13|
| 2015/2/12|
| 2015/2/11|
| 2015/2/10|
| 2015/2/9|
| 2015/2/8|
+-------------+
only showing top 20 rows
# 只保留年份
In [27]: df.select(substring(trim(df.sale_date),1,4).alias('year'),df.sale_amt).show()
+----+--------+
|year|sale_amt|
+----+--------+
|2015| 2618.2|
|2015| 2608.4|
|2015| 2651.9|
|2015| 3442.1|
|2015| 3393.1|
|2015| 3136.6|
|2015| 3744.1|
|2015| 6607.4|
|2015| 2060.3|
|2015| 3614.7|
|2015| 3295.5|
|2015| 2332.1|
|2015| 2699.3|
|2015| null|
|2015| 3036.8|
|2015| 865|
|2015| 3014.3|
|2015| 2742.8|
|2015| 2173.5|
|2015| 3161.8|
+----+--------+
only showing top 20 rows
# substring(str,pos,len) 返回 str 子串,從 pos 位置(包含) 返回 len 長度的子串
- 糾正錯誤資料
- 刪除重複資料
- 刪除唯一性屬性
- 除去不相關欄位或特徵
- 處理不一致資料等
資料變換
- 規範化
- 離散化
- 衍生指標
- 類別特徵數值化
- 平滑噪音
資料預處理 | 演算法 | 功能簡介 |
---|---|---|
特徵抽取 | TF-IDF | 統計文件的詞頻–> 逆向檔案評率(TF-IDF) |
特徵抽取 | Word2Vec | 將文件轉換成向量 |
特徵裝換 | Tokeniziation | Tockenization 將文字劃分為獨立的個體 |
特徵裝換 | StopWordsRemover | 刪除所有停用詞 |
特徵裝換 | PCA | 使用 PCA 可以對變數集合進行降維 |
特徵裝換 | StringIndexer | StringIndexer 將字串列編碼為標籤索引列 |
特徵裝換 | OneHotEncoder | 將標籤指標對映為0/1 的向量 |
特徵裝換 | Normalizer | 規範每個向量以具有單位範數 |
特徵裝換 | StandardScaler | 標準化每個特徵使得其有統一的標準差以及(或者)均值為0,方差為1 |
特徵裝換 | VectorAssembler | 將給定的多列表組合成一個單一的向量列 |
# 定義特徵向量
featuresArray = ['season','yr','mnth','hr','holiday','weekday','workingda','weathersit','temp','atemp','hum','windspeed']
# 把各特徵組合成特徵向量 features
assembler = VectorAssembler(inputCols=featuresArray,outputCol='features')
# 選擇貢獻度較大的前 5 個特徵
selectorfeature = ChisSqSelector(numTopFeatures=5,featuresCol="features",outputCol="selectedFeatures",labelCol='label')
資料整合
- 資料整合:將多個檔案或者多資料庫中的資料進行合併,然後存放在一個一致的資料儲存中
- 資料整合一般通過 join,union,merge 等關鍵字將兩個或者多個數據集連線在一起。Spark SQL(包括 DataFrame) 有join,pandas 下有 merge方法
- 資料整合往往需要耗費很多資源,尤其是大資料間的整合涉及shuffle過程,有時候需要牽涉多個節點,所以資料整合一般要考慮資料一致性的問題和效能問題
- 傳統的資料庫一般是在單機上採用 hash join 方法,分散式環境中,採用 join時,可以考慮充分利用分散式資源進行平行化(也就是提高併發度,可以通過增加分割槽數來實現),當然在 join 之前,對資料過濾或歸約也是常用的優化方法
- Spark SQL 3種 join 方法
- broadcast hash join:如果 join 表中有一張大表和一張較小的表,可以考慮把小表廣播分發到大表所在的分割槽節點上,分別併發第與其上的分割槽記錄進行 hash join
- shuffle hash join:如果兩個表都不小,對資料量較大的表進行廣播分佈就不太合適,這種情況可以根據 join key 相同分割槽也相同的原理,將兩個表分別按照 join key 進行重新組織分割槽,這樣就可以將 join 分而治之,劃分為很多小的 join,充分利用叢集資源並行化
- sort merge join:對資料量較大的表可以考慮使用 sort merge join 方法,先將兩張大表根據 join key 進行重新分割槽,兩張表資料會分佈到整個叢集,以便分散式並行處理,然後,對單個分割槽節點的兩表資料分佈進行排序,最後,對排好序的兩種分割槽表執行 join操作
- DataFrame 中的 join(或 merge) 方式:內連線,左連線,右連線
資料歸約
資料歸約:刪除或減少資料的冗餘性(降維就是資料歸約其中的一種技術)、精簡資料集等,使得歸約後資料比原資料小,甚至小很多,但仍然接近於保持原資料的完整性,且結果與歸約前後結果相同或幾乎相同
資料預處理 | 演算法 | 功能簡介 |
---|---|---|
特徵選擇或降維 | VectorSlicer | 得到一個新的原始特徵子集的特徵向量 |
特徵選擇或降維 | RFormula | 通過 R 模型公式來將資料中的欄位轉換為特徵值 |
特徵選擇或降維 | PCA | 使用 PCA 方法可以對變數集合進行降維 |
特徵選擇或降維 | SVD | |
特徵選擇或降維 | ChiSqSelector | 根據分類的卡方獨立性檢驗來對特徵排序,選取類別標籤主要依賴的特徵 |
- SVD,PCA example
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.{SparkContext,SparkConf}
object chooseFeatures {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("dataReduce").setMaster("local")
val sc = new SparkContext(conf)
val data = Array(Vectors.dense(1,2,3,4,5,6,7,8,9),
Vectors.dense(5,6,7,8,9,0,8,6,7),
Vectors.dense(9,0,8,7,1,4,3,2,1),
Vectors.dense(6,4,2,1,3,4,2,1,5),
Vectors.dense(4,5,7,1,4,0,2,1,8))
val dataRDD = sc.parallelize(data,2)
val mat : RowMatrix = new RowMatrix(dataRDD)
val svd = mat.computeSVD(3,computeU = true)
val U:RowMatrix = svd.U
val s:Vector = svd.s
val V:Matrix = svd.V
println("U: " ,U)
println("V: " ,V)
println("s: " ,s)
val pc:Matrix = mat.computePrincipalComponents(3)
println("pc: ",pc)
/*
output:
(U: ,[email protected])
(V: ,-0.33309047675110115 0.6307611082680837 0.10881297540284612
-0.252559026169606 -0.13320654554805747 0.4862541277385016
-0.3913180354223819 0.3985110846022322 0.20656596253983592
-0.33266751598925126 0.25621153877501424 -0.3575093420454635
-0.35120996186827147 -0.24679309180949208 0.16775460006130793
-0.1811460330545444 0.03808707142157401 -0.46853660508460787
-0.35275045425261 -0.19100365291846758 -0.26646095393100677
-0.2938422406906167 -0.30376401501983874 -0.4274842789454556
-0.44105410502598985 -0.4108875465911952 0.2825275707788212 )
(s: ,[30.88197557931219,10.848035248251415,8.201924156089822])
(pc: ,-0.3948204553820511 -0.3255749878678745 0.1057375753926894
0.1967741975874508 0.12066915005125914 0.4698636365472036
-0.09206257474269655 -0.407047128194367 0.3210095555021759
0.12315980051885281 -0.6783914405694824 -0.10049065563002131
0.43871546256175087 -0.12704705411702932 0.2775911848440697
-0.05209780173017968 0.10583033338605327 -0.6473697692806737
0.422474587406277 -0.27600606797384 -0.13909137208338707
0.46536643478632944 -0.172268807944553 -0.349731653791416
0.4376262507870099 0.3469015236606571 0.13076351966313637 )
*/
構建模型
演算法選擇的原則:
- 業務需求、資料特徵、演算法適應性。個人經驗等
- 選擇幾種演算法進行比較
- 採用整合學習的方式,複合多種演算法也是選項之一。如:先採用聚類方法對資料進行聚類,然後對不同的類別資料進行預測和推薦
- 從簡單和熟悉的演算法入手,然後不斷的完善和優化
spark ML 目前支援的演算法
型別 | spark ML 目前支援的演算法 |
---|---|
分類 | 邏輯迴歸,分二項邏輯迴歸 (Binomial logistic regression) 和多項邏輯迴歸(Multinomial logistic regression) |
分類 | 決策樹分類 (Decision tree classifier) |
分類 | 隨機森林分類 (Random forest classifier) |
分類 | 梯度提升決策樹分類 (Gradient-boosted tree classifier) |
分類 | 多層感知機分類 (Multilayer perceptron classifier) |
分類 | 一對多分類 (One-vs-Rest classifier) |
分類 | 樸素貝葉斯 (native Bayes) |
迴歸 | 線性迴歸 (Linear regression) |
迴歸 | 廣義線性迴歸 (Generalized liner regression) |
迴歸 | 決策樹迴歸 (Decision tree regression) |
迴歸 | 隨機森林迴歸 (Random forest regression) |
迴歸 | 梯度提升決策樹迴歸 (Gradient-boosted tree regression) |
迴歸 | 生存迴歸 (Survival regression) |
迴歸 | 保序迴歸 (Isotonic regression) |
推薦 | 協同過濾 (Collaborative filtering) |
聚類 | K-均值(k-means) |
聚類 | 高斯混合模型 (Gaussian Mixture Model) |
聚類 | 主題模型 (latent Dirichlet allocation LDA) |
聚類 | 二分 K 均值 (bisecting k-means) |
- 演算法確定了還需要設定一些引數,如訓練決策樹的時候需要選擇迭代次數、純度計算方法、樹的最大高度等
- 資料劃分為訓練資料和測試資料,訓練資料用來訓練模型,測試資料用來驗證模型,這種驗證方式屬於交叉驗證(CrossValidator CV)
- K-CV (K-fold Cross Validator) K 折交叉驗證,不重複地隨機將資料劃分為 k 份,如 K = 3,則將產生 3 個(訓練/測試) 資料集對,每個資料集使用 2/3 的資料進行訓練,1/3 的資料進行測試。這樣就會得到3個模型,用這 3 個模型的平均數作為最終模型的效能指標。K-CV 可以有效的避免欠學習狀態的發生,其結果比較有說服力
- spark 提供了多種資料劃分的方法:randomSplit、CrossValidator等
模型評估方法
- 對模型的效能、與目標的切合度等進行評估
- 評估指標:精確度,ROC,RMSE等,這些指標是重要而基礎的,但不是唯一和最終指標,除了這些指標,我們還應該評估模型對業務的提示或商業目標的達成等方面的貢獻
- spark 評估演算法:
- 均方差 (MSE,Mean Squared Error)
- 均方差更 (RMSE Root Mean Squared Error)
- 平均絕對值誤差 (MAE,Mean Absolue Error)
- 混淆矩陣(confusion matrix):簡單的矩陣,用於展示一個二分類器的預測結果,其中 T 為 True,F 為 False、N 為 Negative(負樣本)、P 為 Postitive(正樣本)
- 真正(TP):被模型預測為正的正樣本數(本身是正樣本,模型預測也是正樣本),可以稱作判斷為真的正確率
- 真負(TN):被模型預測為負的負樣本數(本身是負樣本,模型預測也是負樣本),可以稱作判斷為假的正確率
- 假正(FP):被模型預測為正的負樣本數,(本身是負樣本,模型預測是正樣本),可以稱作誤報率
- 假負(FN):被模型預測為負的正樣本數,(本身是正樣本,模型預測是負樣本),可以稱作為漏報率
- 評估指標:
- 準確率(Precision):反映了被分類器判定的正例中真正的正例樣本的比重 P = TP/(TP+FP)
- 錯誤率(Error):模型預測錯誤佔整個正負樣本的比重。E = (FP+FN) / (P+N) = (FP+FN) / (FP+FN+TP+TN)
- 正確率(Accuracy):模型預測正確佔整個正負樣本的比重。 A = (TP+TN) / (P+N) = (TP+TN+FN+FP)
- 召回率(Recall):反映了被正確判定的正例佔總正例的的比重 R = (TP) / (TP + FN)
- A + E = 1
- F1-Measure f-measure是一種統計量,F-Measure又稱為F-Score,F-Measure是Precision(準確率)和Recall(召回率)加權調和平均
- 真陽率(TPR):代表分類器預測的正類中實際正例項佔所有正例項的比重。TPR = TP/(TP+FN)
- 假陽率(FPR):程式碼分類器預測的正類中實際負例項佔所有負例項的比重。FRP = FP/(FP+TN)
package spark.mllib
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.{
BinaryClassificationEvaluator,
MulticlassClassificationEvaluator
}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}
object modelAssess {
def main(args: Array[String]): Unit = {
val path = args(0)
// sparkSession 是 Spark SQL 的入口,資料結構是 DataFrame。sc Spark CORE 的入口,資料結構是 RDD,從 Spark 2.0 及以後應儘量使用 DataFrame和DataSet
val spark: SparkSession = SparkSession.builder
.appName("modelAsess")
.master("yarn")
.config("spark.testing.memory", "471859200")
.getOrCreate
// DataFrame.read return DataFrameReader。read 預設支援的格式 parquet,hive 支援的列式儲存的檔案格式
// format(source: String) 指定源資料的格式
// load(path: String) 載入資料
// data : dataFrame
val data = spark.read.format("libsvm").load(path)
// randomSplit(weight: Array[Double],seed : Long)
// 按照 Double 型別的陣列提供的權重值來隨機切分資料,seed 可以理解為新增的雜誌,增加隨機性
val Array(trainingData, testData) =
data.randomSplit(Array(0.7, 0.3), seed = 1234L)
// LogisticRegression 是一個類
//.setThreshold(value : Double) 二分類中,設定閥值,概率大於該值的預測為1,概率小於該值預測為0。預設值:0.5.如果此值過大,那麼很多標籤都會被預測為0,如果此值過小,很多標籤被預測為1
// setMaxIter(value: Int) 設定最大迭代次數,預設是100
// setRegParam(value : Double) 設定正則化引數,預設是 0.0
// setElasticNetParam(value: Double) value = 0.0 使用 L2 正則化,如果value = 1.0 使用 L1 正則化,如果 0.0 < value < 1.0 使用 L1 和 L2 組合的正則化。注意如果是 fit 優化只支援 L2 正則化
// 正則化主要是對權重比較大的特徵進行懲罰,避免過度依賴某個特徵造成過擬合
val lr = new LogisticRegression()
.setThreshold(0.6)
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// fit(dataset: DataSet) 使用這個構建模型,輸入資料是訓練資料
val lrMode = lr.fit(trainingData)
// transform(dataSet: DataSet) : DataFrame 將測試資料輸入開始預測
val predictions = lrMode.transform(testData)
predictions.show()
// BinaryClassificationEvaluator 這個類是用來評估二分類演算法構建的模型的預測效果,有兩個期望輸入列:label 標籤列和 rawPrediction
// setLabelCol 設定標籤列的列名
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
// evaluate(dataSet: DataSet) : Double 評估模型的預測結果,返回一個度量值
// def isLargerBetter: Boolean true:評估返回的指標應最大化,false:評估返回的值應最小化
val accuracy = evaluator.evaluate(predictions)
// RegressionMertrics 這個類是用來評估迴歸模型
// new RegressionMetrics(predictionAndObservations: RDD[(Double, Double)])
/*
val dataFrame1 = predictions.select("prediction","label")
dataFrame1 : org.apache.spark.sql.DataFrame
val rdd1 = dataFram1.rdd
rdd1 : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
rdd1.rdd.map(x =>(x(0).asInstanceOf[Double],x(1).asInstanceOf[Double]))
return org.apache.spark.rdd.RDD[(Double, Double)]
x :org.apache.spark.sql.Row = [0.0,0.0]
x(0) : Any = 0.0
x(1) : Any = 0.0
x(0).asInstanceOf[Double] 將傳入的物件轉換成 Double 型別,這裡就是將 Any 轉換成 Double 型別
*/
val rm2 = new RegressionMetrics(
predictions
.select("prediction", "label")
.rdd
.map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
/*
def meanSquaredError: Double
def meanAbsoluteError: Double
def rootMeanSquaredError: Double
*/
println("MSE: ", rm2.meanSquaredError)
println("MAE: ", rm2.meanAbsoluteError)
println("RMSE Squared: ", rm2.rootMeanSquaredError)
val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
val multiclassClassificationEvaluator: MulticlassClassificationEvaluator =
new MulticlassClassificationEvaluator()
printlnMetricMulti("f1", predictions, multiclassClassificationEvaluator)
printlnMetricMulti("weightedPrecision",
predictions,
multiclassClassificationEvaluator)
printlnMetricMulti("weightedRecall",
predictions,
multiclassClassificationEvaluator)
printlnMetricMulti("accuracy",
predictions,
multiclassClassificationEvaluator)
printlnMetricbinary("areaUnderROC",
binaryClassificationEvaluator,
predictions)
printlnMetricbinary("areaUnderPR",
binaryClassificationEvaluator,
predictions)
// A error of "value $ is not StringContext member" is reported if you don't add following line
import spark.implicits._
// 計算 TP,分類正確且分類為1的樣本數
println(
predictions
.filter($"label" === $"prediction")
.filter($"label" === 1)
.count)
// 計算 TN,分類正確且分類為0的樣本數
println(
predictions
.filter($"label" === $"prediction")
.filter($"prediction" === 0)
.count)
// 計算 FN,分類錯誤且分類為0的樣本數
println(
predictions
.filter($"label" !== $"prediction")
.filter($"prediction" === 0)
.count)
// 計算 FP,分類錯誤且分類為1的樣本數
println(
predictions
.filter($"label" !== $"prediction")
.filter($"prediction" === 1)
.count)
/*
準確率: TP(TP+FP) = 17/(17+0) = 1
召回率: TP(TP+FN) = 17/(17+1) = 0.944444
*/
}
// 計算準確率、召回率、正確率、F1
def printlnMetricMulti(
metricsName: String,
predictions: DataFrame,
multiclassClassificationEvaluatdor: MulticlassClassificationEvaluator)
: Unit = {
/*
val multiclassClassificationEvaluatdor = new MulticlassClassificationEvaluator()
setMetricName(metricName : String) : BinaryClassificationEvaluator.this.type 設定評估指標的名字
evaluate(predictions) 這個同上返回一個 Double的值,該指標具體的值
*/
println(
metricsName + " = " + multiclassClassificationEvaluatdor
.setMetricName(metricsName)
.evaluate(predictions))
}
// 計算 AUC(area under ROC ROC 曲線下的區域的面積),area under PR PR曲線的面積
def printlnMetricbinary(
metricsName: String,
binaryClassificationEvaluator: BinaryClassificationEvaluator,
predictions: DataFrame): Unit = {
println(
metricsName + " = " + binaryClassificationEvaluator
.setMetricName(metricsName)
.evaluate(predictions))
}
}
/*
程式碼叢集方式提交
#!/bin/bash
cd /data/project/spark/spark_workstation
/data/sbt/bin/sbt compile && /data/sbt/bin/sbt package && \
/data/spark/bin/spark-submit --master yarn \
--deploy-mode cluster \
--verbose \
--num-executors 2 \
--executor-memory '1024m' \
--executor-cores 1 \
--class spark.mllib.modelAssess ./target/scala-2.11/mergedata_2.11-2.2.1.jar \
hdfs://master:9000/sparkMLlib/sample_libsvm_data.txt
*/
組裝
將資料的清洗、轉換等資料的預處理工作,以及構建模型和評估模型這些任務當做 spark Pipeline的 stage,這樣既可以保證各任務之間有序執行,也保證的處理資料的資料的一致性
// 建立 Pipeline,將各個 Stage 依次組裝在一起
val pipeline = new Pipeline().setStages(Array(tokenizer,hashingTF,lr))
// 在訓練集上擬合這個 Pipeline
val model = pipeline.fit(training)
// 在測試集上做預測
model.transform(test).select("label","prediction")
模型選擇或調優
調優:使用給定的資料為給定的任務尋找最適合的模型或引數,調優可以是對單個階段進行除錯,也可以一次性對整個Pipeline 進行調優
MLlib 支援使用型別CorssValidator 和 TrainValidationSplit 這樣的工具進行模型選擇,這類工具有一下元件
- Estimator:使用者調優的演算法或者Pipeline
- ParamMap 集合:提供引數的選擇,有時又稱使用者查詢的引數網格(parameter grid),引數網格可以使用 ParamGridBuilder 來構建
- Evaluator:衡量模型在測試資料上的擬合程度
模型選擇工具工作原理如下:
- 對輸入資料劃分為訓練資料和測試資料
- 對於每個(訓練/測試)對,遍歷一組ParamMaps。用每一個 ParamMap 引數來擬合估計器,得到訓練後的模型,再使用評估器來評估模型的表現
- 選擇效能表現最優的模型對應引數表
交叉驗證(CrossValidator):
- 交叉驗證:將資料切分成 K 折資料集合,分別用於訓練資料和測試資料
- 如果 K = 3 就會有3份 訓練/測試資料對。每一份資料對,其中訓練資料佔 2/3,測試資料佔 1/3,為了評估一個 ParamMap,CrossValidator 會計算這 3 個不同的(訓練,測試) 資料集對在 Estimator 擬合出的模型上平均評估指標
- 在找出最好的 ParamMap後,CrossValidator 會利用此 ParamMap 在整個訓練機上訓練(fit) 出一個泛華能力強、誤差相對小的最佳模型,整個過程處於流程化管理之中,工作流程圖如下
交叉驗證的缺點:雖然利用 CrossValidator 來訓練模型可以提升泛華能力,但其代價比較高。如果 K =3 regParam = (0.1,0.01)、numiters = (10,20) 這樣就需要對模型訓練 3*2*2 = 12 次。然而對比啟發式的手動調優,這是選擇引數的行之有效的方法
訓練驗證切分 (TrainValidationSplit)
- TrainValidation 建立單一的(訓練、測試)資料集對,它適用 trainRatio 引數將資料集切分成兩部分。並最終使用最好的 ParamMap 和 完整的資料集來擬合評估器
- 例如: trainTatio = 0.8 TrainValidationSplit 80% 作為訓練資料集,20%作為測試資料集
- TrainValidation 優點就是隻對每對引數組合評估1次,因此效能比較好,但是當訓練資料集不夠大的時候其結果相對不可信
儲存模型
- 儲存擬合後的流水線到磁碟上
model.write.overwrite().save("/tmp/spark-logistic-regreesion-model")
- 儲存未擬合的流水線到磁碟上
pipeline.write.overwrite().save("/tmp/spark-logistic-regression-model1")
- 把擬合後的流水線部署到其他環境中
val sameMode = PiplelineModel.load("/tmp/spark-logistic-regreesion-model")