1. 程式人生 > >使用pyspark進行機器學習(迴歸問題)

使用pyspark進行機器學習(迴歸問題)

DecisionTreeRegressor

class pyspark.ml.regression.DecisionTreeRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", seed=
None, varianceCol=None)

支援連續與分類變數

引數解釋

fit(dataset, params=None)方法
Impurity: 資訊增益計算準則,支援選項:variance
maxBins: 連續特徵離散化的最大分箱個數, >=2並且>=任何分類特徵的分類個數
maxDepth: 最大樹深
minInfoGain: 分割節點所需最小資訊增益
minInstancesPerNode: 分割後每個子節點最小例項個數
Setter方法和getter方法

擬合後模型擁有的方法或屬性

depth: 返回決策樹的深度
featureImportances: 每個特徵重要性估計(見DecisionTreeClassifer)
numFeatures: 返回訓練模型中的特徵個數,如果不知,返回-1
numNodes: 返回決策樹中的結點個數
transform(dataset, params=None)方法

程式碼

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)),(0.0, Vectors.sparse(1, [], []))], ["label", "features"])
dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")
model = dt.fit(df)
model.depth
#1
model.numNodes
#3
model.featureImportances
#SparseVector(1, {0: 1.0})
model.numFeatures #1 test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"]) model.transform(test0).head().prediction #0.0 test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"]) model.transform(test1).head().prediction #1.0 dtr_path = temp_path + "/dtr" dt.save(dtr_path) dt2 = DecisionTreeRegressor.load(dtr_path) dt2.getMaxDepth() #2 model_path = temp_path + "/dtr_model" model.save(model_path) model2 = DecisionTreeRegressionModel.load(model_path) model.numNodes == model2.numNodes #True model.depth == model2.depth #True model.transform(test1).head().variance #0.0

GBTRegressor

class pyspark.ml.regression.GBTRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, subsamplingRate=1.0, checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1, seed=None, impurity="variance")

引數解釋

fit(dataset,params=None)方法
Impurity: 同DecisionTreeRegressor
lossType: GBT要最小化的損失函式,可選:squared, absolute
maxBins: 同DecisionTreeRegressor
maxDepth: 同DecisionTreeRegressor
maxIter: 最大迭代次數
minIfoGain: 同DecisionTreeRegressor
minInstancesPerNode: 同DecisionTreeRegressor
stepSize: 每次優化迭代的步長
subsamplingRate:用於訓練每顆決策樹的訓練資料集的比例,區間[0,1]
Setter和getter方法

擬合後模型擁有的方法或屬性

featureImportances: 同DecisionTreeRegressor
getNumTrees: 同DecisionTreeRegressor
numFeatures: 同DecisionTreeRegressor
totalNumNoes: 整合樹中所有的結點
transform(dataset,params=None)方法
treeWeights: 每棵樹的權重
trees: 返回樹

程式碼

from numpy import allclose
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)), (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)
print(gbt.getImpurity())
#variance
model = gbt.fit(df)
model.featureImportances
#SparseVector(1, {0: 1.0})
model.numFeatures
#1
allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])
#True
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
model.transform(test0).head().prediction
#0.0
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)],["features"])
model.transform(test1).head().prediction
#1.0
gbtr_path = temp_path + "gbtr"
gbt.save(gbtr_path)
gbt2 = GBTRegressor.load(gbtr_path)
gbt2.getMaxDepth()
#2
model_path = temp_path + "gbtr_model"
model.save(model_path)
model2 = GBTRegressionModel.load(model_path)
model.featureImportances == model2.featureImportances
#True
model.treeWeights == model2.treeWeights
#True
model.trees
#[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]

GeneralizedLinearRegression

class pyspark.ml.regression.GeneralizedLinearRegression(self, labelCol="label", featuresCol="features", predictionCol="prediction", family="gaussian", link=None, fitIntercept=True, maxIter=25, tol=1e-6, regParam=0.0, weightCol=None, solver="irls", linkPredictionCol=None)

給出link函式和family函式擬合廣義線性模型
Family函式對應的link函式,第一個分別是預設值
• “gaussian” -> “identity”, “log”, “inverse”
• “binomial” -> “logit”, “probit”, “cloglog”
• “poisson” -> “log”, “identity”, “sqrt”
• “gamma” -> “inverse”, “identity”, “log”

引數解釋

fit(dataset,params=None)方法
family: 誤差分佈的描述名稱,可選:gaussian(預設), binomial, poisson, gamma
fitIntercept: 是否擬合截距項
link: 提供線性預測變數和分佈函式均值的關係名稱,可選:identity, log, inverse, logit, probit, cloglog, sqrt.
maxIter: 最大迭代次數
solver: 優化的演算法,如果沒設定或空則使用”auto”
tol: 迭代的收斂偏差
Setter和getter方法

擬合後模型擁有的方法或屬性(experimental)

coefficients: 模型係數
Evaluate(dataset): 在檢測集上評估模型
hasSummary: 是否有summary
Intercept: 模型截距
numFeatures: 訓練模型的特徵個數
Summary:獲取summary
transform(dataset,params=None)方法

Summary屬性

aic: 模型的aic準則
degreesOfFreedom: 自由度
deviance: 擬合模型的偏差
dispersion: 離差,對於binomial和poisson family為1,其他的由殘差的Pearson Chi方統計量估計
predictions: 由模型tranform方法產生的預測輸出
rank: 擬合線性模型的數值排秩
residualDegreeOfFreedom: 殘差的自由度
residual(residualType=’deviance’): 擬合模型的殘差,residualType為返回殘差型別,可選:pearson, working, response

trainingSummary屬性

aic: aic準則
coefficientStandardErrors: 估計係數和截距的標準誤
degreesOfFreedom
deviance
dispersion
pValues: 估計係數和解決的雙邊p值
predictions
rank
residualDegreeOfFreedom
Residuals(residualType=’deviance’)
solver
tValues: 估計係數和截距的T統計量

程式碼

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([ (1.0, Vectors.dense(0.0, 0.0)), (1.0, Vectors.dense(1.0, 2.0)),(2.0, Vectors.dense(0.0, 0.0)),(2.0, Vectors.dense(1.0, 1.0)),], ["label", "features"])
glr = GeneralizedLinearRegression(family="gaussian", link="identity", #linkPredictionCol="p")
model = glr.fit(df)
transformed = model.transform(df)
abs(transformed.head().prediction - 1.5) < 0.001
#True
abs(transformed.head().p - 1.5) < 0.001
#True
model.coefficients
DenseVector([1.5..., -1.0...])
model.numFeatures
#2
abs(model.intercept - 1.5) < 0.001
#True
glr_path = temp_path + "/glr"
glr.save(glr_path)
glr2 = GeneralizedLinearRegression.load(glr_path)
glr.getFamily() == glr2.getFamily()
#True
model_path = temp_path + "/glr_model"
model.save(model_path)
model2 = GeneralizedLinearRegressionModel.load(model_path)
model.intercept == model2.intercept
#True
model.coefficients[0] == model2.coefficients[0]
#True

LinearRegression

class pyspark.ml.regression.LinearRegression(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2)

支援多種型別的正則化:
• None:OLS
• L2:ridge迴歸
• L1:Lasso迴歸
• L1+L2:elastic迴歸

引數解釋

aggregationDepth: 樹聚合的深度, >=2
elasticNtParam: ElasticNet混合引數,在[0,1]範圍內,alpha=0為L2, alpha=1為L1
fit(dataset,params=None)方法
fitIntercept: 是否擬合截距
maxIter: 最大迭代次數
regParam:正則化引數 >=0
solver: 優化演算法,沒設定或空則使用”auto”
standardization: 是否對擬合模型的特徵進行標準化
Setter和getter方法

擬合後模型擁有的方法或屬性

coefficients
evaluate(dataset)
hasSummary
intercept
numFeatures
Summary
transform(dataset, params=None)

Summary屬性(experimental)

coefficientStandardErrors
devianceResiduals: 加權殘差
explainedVariance: 返回解釋的方差迴歸得分,explainedVariance=1variance(y(^y))/variance(y)
meanAbsoluteError: 返回均值絕對誤差
meanSquaredError: 返回均值平方誤
numInstances: 預測的例項個數
pValues: 係數和截距的雙邊P值,只有用”normal”solver才可用
predictions: 模型transform方法返回的預測
r2: R方
residuals: 殘差
rootMeanSquaredError: 均方誤差平方根
tValues: T統計量

traningSummary屬性

coefficientStandardErrors
devianceResiduals
explainedVariance
meanAbsoluteError
meanSquaredError
numInstances
objectiveHistory: 每次迭代的目標函式,只有使用”l-bfgs”solver才可用
pValues
Predictions
r2
residuals
rootMeanSquaredError
tValues
totalIterations: 結束前總迭代次數

程式碼

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([ (1.0, 2.0, Vectors.dense(1.0)),(0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"])
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight")
model = lr.fit(df)
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
abs(model.transform(test0).head().prediction - (-1.0)) < 0.001
#True
abs(model.coefficients[0] - 1.0) < 0.001
#True
abs(model.intercept - 0.0) < 0.001
#True
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
abs(model.transform(test1).head().prediction - 1.0) < 0.001
#True
lr.setParams("vector")
#Traceback (most recent call last):
#    ...
#TypeError: Method setParams forces keyword arguments.
lr_path = temp_path + "/lr"
lr.save(lr_path)
lr2 = LinearRegression.load(lr_path)
lr2.getMaxIter()
#5
model_path = temp_path + "/lr_model"
model.save(model_path)
model2 = LinearRegressionModel.load(model_path)
model.coefficients[0] == model2.coefficients[0]
#True
model.intercept == model2.intercept
#True
model.numFeatures
#1

RandomForestRegressor

class pyspark.ml.regression.RandomForestRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", subsamplingRate=1.0, seed=None, numTrees=20, featureSubsetStrategy="auto")

引數解釋

fit(dataset,params=None)方法
featureSubsetStrategy: 每棵樹的節點上要分割的特徵數量,可選:auto, all, onethird, sqrt, log2,(0.0,1.0],[1-n]
impurity: 資訊增益計算的準則,可選:variance
maxBins: 連續特徵離散化最大分箱個數。
maxDepth: 樹的最大深度
minInfoGain: 樹節點分割特徵所需最小的資訊增益
minInstancesPerNode: 每個結點所需最小例項個數
numTrees: 訓練樹的個數
subsamplingRate: 學習每顆決策樹所需樣本比例
Setter和getter方法

擬合後模型擁有的方法或屬性

featureImportances: 見DecisionTreeRegressor
numFeatures: 返回模型特徵個數,如果不知,返回-1
totalNumNodes: 結點總數
transform(dataset,params=None)方法
treeWeights: 每棵樹的權重
trees: 樹

程式碼

from numpy import allclose
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)), (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
rf = RandomForestRegressor(numTrees=2, maxDepth=2, seed=42)
model = rf.fit(df)
model.featureImportances
#SparseVector(1, {0: 1.0})
allclose(model.treeWeights, [1.0, 1.0])
#True
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
model.transform(test0).head().prediction
#0.0
model.numFeatures
#1
model.trees
#[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]
model.getNumTrees
#2
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#0.5
rfr_path = temp_path + "/rfr"
rf.save(rfr_path)
rf2 = RandomForestRegressor.load(rfr_path)
rf2.getNumTrees()
#2
model_path = temp_path + "/rfr_model"
model.save(model_path)
model2 = RandomForestRegressionModel.load(model_path)
model.featureImportances == model2.featureImportances
#True