使用pyspark進行機器學習(分類問題)
阿新 • • 發佈:2018-12-30
LogisticRegression
class pyspark.ml.classification.LogisticRegression(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol="probability", rawPredictionCol="rawPrediction" , standardization=True, weightCol=None, aggregationDepth=2, family="auto")
引數解釋
regParam: 正則化引數(>=0)
elasticNetParam: ElasticNet混合引數,0-1之間,當alpha為0時,懲罰為L2正則化,當為1時為L1正則化
fitIntercept: 是否擬合一個截距項
Standardization: 是否在擬合數據之前對資料進行標準化
aggregationDepth: 樹聚合所建議的深度(>=2)
Family:標籤分佈簇的名稱,可選:“auto", "binomial", " multinomial"
引數的getter和setter方法
擬合後的模型擁有的方法和屬性
coefficientMatrix: 模型的係數矩陣(對於multinomial而言)
coefficients: 雙變數logistic迴歸的模型係數,如果是多元Logistic迴歸的話會有異常
evaluate(dataset):在測試集上評估模型
hasSummary: 是否有summary
intercept: 二變數logistic模型的截距
interceptVector: 多變數logistic模型截距
summary:獲得summary
transform(dataset,param=None)
Summary擁有的屬性
predictions: 模型transform方法輸出的預測資料框
probabilityCol: 給出每個類的概率
對於雙變數的summary而言擁有的屬性:
areaUnderROC: 計算AUC
fMeasureByTreshold: 返回帶有兩個欄位(閾值,F-統計量)的資料框,beta=1.0
pr: 返回精度-召回率兩欄位的資料框
precisionByTreshold:返回帶有閾值,精度兩欄位的資料框,應用了從轉換後資料裡的所有可能概率作為閾值來計算精度
recallByTreshold: 返回帶有閾值,召回率兩欄位的資料框,應用了從轉換後資料裡的所有可能概率作為閾值來計算召回率
roc:返回帶有兩欄位FPR, TPR的資料框,
程式碼
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.classification import LogisticRegression
#雙變數Logistic迴歸
bdf = sc.parallelize([Row(label=1.0,weight=2.0,features=Vectors.dense(1.0)),Row(label=0.0, weight=2.0, features=Vectors.sparse(1,[],[]))]).toDF()
bdf.show()
blor = LogisticRegression(maxIter=5, regParam=0.01,weightCol='weight')
blorModel = blor.fit(bdf)
blorModel.coefficients
blorModel.intercept
#多元Logistic迴歸
mdf = sc.parallelize([Row(label=1.0,weight=2.0, features=Vectors.dense(1.0)),Row(label=0.0,weight=2.0, features=Vectors.sparse(1,[],[])),Row(label=2.,weight=2.0, features=Vectors.dense(3.0))]).toDF()
mlor=LogisticRegression(maxIter=5,regParam=0.01,weightCol='weight',family='multinomial')
mlorModel = mlor.fit(mdf)
print mlorModel.coefficientMatrix
mlorModel.interceptVector
#模型預測
test0=sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
result = blorModel.transform(test0).head()
result.prediction
result.probability
result.rawPrediction
test1 = sc.parallelize([Row(features=Vectors.sparse(1,[0],[1.0]))]).toDF()
blorModel.transform(test1).head().prediction
blorModel.transform(test1).show()
#模型評估
blorModel.summary.roc.show()
blorModel.summary.pr.show()
DecisionTreeClassifier
class pyspark.ml.classification.DecisionTreeClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", seed=None)
引數解釋
checkpointInterval:設定checkpoint區間(>=1),或宕掉checkpoint(-1),例如10意味著緩衝區(cache)將會每迭代10次獲得一次checkpoint
fit(datasset,params=None)
impurity: 資訊增益計算的準則,選項"entropy", "gini"
maxBins: 連續特徵離散化的最大分箱,必須>=2 並且>=分類特徵分類的數量
maxDepth:樹的最大深度
minInfoGain:分割結點所需的最小的資訊增益
minInstancesPerNode:每個結點最小例項個數
引數的getter和setter方法
擬合後的模型擁有的方法和屬性
depth:返回決策樹深
featureImportances:估計每個特徵的重要性,特徵重要性的計算:
• 特徵J的重要性=通過特徵j所分割結點的資訊增益的和,資訊增益由通過該結點的例項個數進行標準化
• 整個樹的標準化的重要性和為1
transform(dataset,params=None)方法
程式碼
from pyspark.ml.linalg import Vectors from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)),(0.0, Vectors.sparse(1, [], []))], ["label", "features"])
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
dt = DecisionTreeClassifier(maxDepth=2, labelCol="indexed")
model = dt.fit(td)
model.numNodes
#3
model.depth
#1
model.featureImportances
#SparseVector(1, {0: 1.0})
model.numFeatures
#1
model.numClasses
#2
print(model.toDebugString)
#DecisionTreeClassificationModel (uid=...) of depth 1 with 3 nodes...
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
result = model.transform(test0).head()
result.prediction
#0.0
result.probability
#DenseVector([1.0, 0.0])
result.rawPrediction
#DenseVector([1.0, 0.0])
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#1.0
dtc_path = temp_path + "/dtc"
dt.save(dtc_path)
dt2 = DecisionTreeClassifier.load(dtc_path)
dt2.getMaxDepth()
#2
model_path = temp_path + "/dtc_model"
model.save(model_path)
model2 = DecisionTreeClassificationModel.load(model_path)
model.featureImportances == model2.featureImportances
#True
RandomForestClassifier
class pyspark.ml.classification.RandomForestClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", numTrees=20, featureSubsetStrategy="auto", seed=None, subsamplingRate=1.0)
引數解釋
checkpoint:同DecisionTreeClassifier
featureSubsetStrategy:每棵樹上要分割的特徵數目,選項為"auto","all", "onethird", "sqrt", "log2", "(0.0-1.0],"[1-n]"
fit(dataset,params=None)方法
impurity: 同DecisionTreeClassifier
maxBins:同DecisionTreeClassifier
maxDepth:同DecisionTreeClassifier
minInfoGain: 同DecisionTreeClassifier
numTrees: 訓練樹的個數
subsamplingRate: 用於訓練每顆決策樹的樣本個數,區間(0,1]
引數的getter和setter方法
擬合後的模型擁有的方法和屬性
featureImportances: 同DecisionTreeClassifier,詳見Hastie的《統計學習基礎》
getNumTrees: 樹的個數
transform(dataset, params=None)方法
treeWeights:返回各個樹的權重
程式碼
import numpy
from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame([
(1.0, Vectors.dense(1.0)),
(0.0, Vectors.sparse(1, [], []))], ["label", "features"])
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)
model = rf.fit(td)
model.featureImportances
#SparseVector(1, {0: 1.0})
allclose(model.treeWeights, [1.0, 1.0, 1.0])
#True
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
result = model.transform(test0).head()
result.prediction
#0.0
numpy.argmax(result.probability)
#0
numpy.argmax(result.rawPrediction)
#0
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#1.0
model.trees
#[DecisionTreeClassificationModel (uid=...) of depth..., DecisionTreeClassificationModel...]
rfc_path = temp_path + "/rfc"
rf.save(rfc_path)
rf2 = RandomForestClassifier.load(rfc_path)
rf2.getNumTrees()
#3
model_path = temp_path + "/rfc_model"
model.save(model_path)
model2 = RandomForestClassificationModel.load(model_path)
model.featureImportances == model2.featureImportances
#True
GBTClassifier
class pyspark.ml.classification.GBTClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="logistic", maxIter=20, stepSize=0.1, seed=None, subsamplingRate=1.0)
此分類器實現依據為:
J.H. Friedman. “Stochastic Gradient Boosting.” 1999.
引數解釋
checkpointInterval: 同DecisionTreeClassifier
fit(dataset,params=None)方法
lossType: GBT要最小化的損失函式,選項:logistic
maxBins: 同DecisionTreeClassifier
maxDepth: 同DecisionTreeClassifier
maxIter: 同DecisionTreeClassifier
minInfoGain: 同DecisionTreeClassifier
minInstancesPerNode:同DecisionTreeClassifier
stepSize: 每次迭代優化的步長
subsamplingRate: 同RandomForesetClassier
引數的getter和setter方法
擬合後的模型擁有的方法和屬性
featureImportances: 同DecisionTreeClassifier
getNumTrees:同RandomForesetClassier
totalNumNodes: 結點的總個數,所有樹的加總和
transform(dataset,params=None)方法
treeWeights: 同RandomForesetClassier
程式碼
from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)),(0.0,Vectors.sparse(1, [], []))], ["label", "features"])
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
model = gbt.fit(td)
model.featureImportances
#SparseVector(1, {0: 1.0})
allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])
#True
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
model.transform(test0).head().prediction
#0.0
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#1.0
model.totalNumNodes
#15
print(model.toDebugString)
#GBTClassificationModel (uid=...)...with 5 trees...
gbtc_path = temp_path + "gbtc"
gbt.save(gbtc_path)
gbt2 = GBTClassifier.load(gbtc_path)
gbt2.getMaxDepth()
#2
model_path = temp_path + "gbtc_model"
model.save(model_path)
model2 = GBTClassificationModel.load(model_path)
model.featureImportances == model2.featureImportances
#True
model.treeWeights == model2.treeWeights
#True
model.trees
#[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]
NaiveBayes
class pyspark.ml.classification.NaiveBayes(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", smoothing=1.0, modelType="multinomial", thresholds=None, weightCol=None)
引數解釋
fit(dataset, params=None)方法
modelType: 選項:multinomial和bernulli
smoothing: 平滑引數,應該>=0,預設為1.0
引數的setter和getter方法
擬合後的模型擁有的方法和屬性
numClasses: 類的個數
numFeatures: 訓練模型的特徵個數,如果不知道,返回-1
pi: 類先驗的對數
theta: 類條件概率的對數
transform(dataseet,params=None)方法
程式碼
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([Row(label=0.0, weight=0.1, features=Vectors.dense([0.0, 0.0])),Row(label=0.0, weight=0.5, features=Vectors.dense([0.0, 1.0])),Row(label=1.0, weight=1.0, features=Vectors.dense([1.0, 0.0]))])
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
model = nb.fit(df)
model.pi
#DenseVector([-0.81..., -0.58...])
model.theta
#DenseMatrix(2, 2, [-0.91..., -0.51..., -0.40..., -1.09...], 1)
test0 = sc.parallelize([Row(features=Vectors.dense([1.0, 0.0]))]).toDF()
result = model.transform(test0).head()
result.prediction
#1.0
result.probability
#DenseVector([0.32..., 0.67...])
result.rawPrediction
#DenseVector([-1.72..., -0.99...])
test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
model.transform(test1).head().prediction
#1.0
nb_path = temp_path + "/nb"
nb.save(nb_path)
nb2 = NaiveBayes.load(nb_path)
nb2.getSmoothing()
#1.0
model_path = temp_path + "/nb_model"
model.save(model_path)
model2 = NaiveBayesModel.load(model_path)
model.pi == model2.pi
#True
model.theta == model2.theta
#True
nb = nb.setThresholds([0.01, 10.00])
model3 = nb.fit(df)
result = model3.transform(test0).head()
result.prediction
#0.0