1. 程式人生 > >資料探勘工具---spark使用練習---ml(二)

資料探勘工具---spark使用練習---ml(二)

模型訓練

評估器

評估器是需要評估的統計模型,對所觀測物件做預測或分類。如果從抽象的評估器類派生,新模型必須實現.fit()方法,該方法用給出的在DataFrame中找到的資料和某些預設或自定義的引數來擬合模型。在PySpark 中,由很多評估器可用,本文以Spark2.2.1中提供的模型。
分類
分類
ML包為資料科學家提供了七種分類(Classification)模型以供選擇。

線性迴歸

  • class pyspark.ml.regression.LinearRegression(featuresCol=‘features’, labelCol=‘label’, predictionCol=‘prediction’, maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-06, fitIntercept=True, standardization=True, solver=‘auto’, weightCol=None, aggregationDepth=2)
>>> from pyspark.ml.linalg import Vectors
>>> df = spark.createDataFrame([
...     (1.0, 2.0, Vectors.dense(1.0)),
...     (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"])
>>> lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight"
) >>> model = lr.fit(df) >>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"]) >>> abs(model.transform(test0).head().prediction - (-1.0)) < 0.001 True >>> abs(model.coefficients[0] - 1.0) < 0.001 True >>> abs(model.intercept - 0.0) < 0.001 True
>>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"]) >>> abs(model.transform(test1).head().prediction - 1.0) < 0.001 True >>> lr.setParams("vector") Traceback (most recent call last): ... TypeError: Method setParams forces keyword arguments. >>> lr_path = temp_path + "/lr" >>> lr.save(lr_path) >>> lr2 = LinearRegression.load(lr_path) >>> lr2.getMaxIter() 5 >>> model_path = temp_path + "/lr_model" >>> model.save(model_path) >>> model2 = LinearRegressionModel.load(model_path) >>> model.coefficients[0] == model2.coefficients[0] True >>> model.intercept == model2.intercept True >>> model.numFeatures 1

邏輯迴歸

(1)LogisticRegression :邏輯迴歸,支援多項邏輯(softmax)和二項邏輯迴歸。

  • pyspark.ml.classification.LogisticRegression(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol="probability", rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto")

  • setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol="probability", rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto")

#-*- coding:utf-8 -*-

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel

if __name__=="__main__":
    sc=SparkContext(appName="myApp")
    spark=SparkSession.builder.getOrCreate()
    df=sc.parallelize([
    Row(label=1.0,weight= 1.0,features=Vectors.dense(0.0, 5.0)),
    Row(label=0.0,weight= 2.0,features=Vectors.dense(1.0, 2.0)),
    Row(label=1.0,weight= 3.0,features=Vectors.dense(2.0, 1.0)),
    Row(label=0.0,weight= 4.0,features=Vectors.dense(3.0, 3.0))
    ]).toDF()
    """
    df.show()
    +---------+-----+------+
    | features|label|weight|
    +---------+-----+------+
    |[0.0,5.0]|  1.0|   1.0|
    |[1.0,2.0]|  0.0|   2.0|
    |[2.0,1.0]|  1.0|   3.0|
    |[3.0,3.0]|  0.0|   4.0|
    +---------+-----+------+
    """
    blor=LogisticRegression(regParam=0.01, weightCol="weight")
    blorModel=blor.fit(df)
    # print(blorModel.coefficients)
    # [-1.08072664359,-0.646290405354]
    # print(blorModel.intercept)
    # 3.1127663191585144
    test0=sc.parallelize([Row(features=Vectors.dense(-1.0, 1.0))]).toDF()
    result=blorModel.transform(test0)
    """
    result.show()
    +----------+--------------------+--------------------+----------+
    |  features|       rawPrediction|         probability|prediction|
    +----------+--------------------+--------------------+----------+
    |[-1.0,1.0]|[-3.5472025573965...|[0.02799860485691...|       1.0|
    +----------+--------------------+--------------------+----------+
    """
    temp_path="/tmp/test"
    lr_path = temp_path + "/lr"
    blor.save(lr_path)
    blorLoad=LogisticRegression.load(lr_path)
    # print(blorLoad.getRegParam())
    # 0.01
    model_path = temp_path + "/lr_model"
    blorModel.save(model_path)
    blorModelLoad=LogisticRegressionModel.load(model_path)
    # print(blorModelLoad.intercept==blorModel.intercept)
    # True

支援向量機

**class pyspark.ml.classification.LinearSVC(*args, kwargs)
這個二元分類器使用OWLQN優化器來 優化the Hinge Loss,目前只支援L2正則化。

#-*- coding:utf-8 -*-
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import LinearSVCModel

sc=SparkContext(appName="myApp")
spark=SparkSession.builder.enableHiveSupport().getOrCreate()
df = sc.parallelize([Row(label=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),Row(label=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()
svm=LinearSVC(maxIter=5, regParam=0.01)
model=svm.fit(df)
print(model.coefficients)
#[0.0,-0.27917116657,-0.183278426036]
print(model.intercept)
#1.0206118982229047
print(model.numClasses)
#2
print(model.numFeatures)
#3
test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 2.0, 3.0))]).toDF()
result=model.transform(test0).head()
print(result.prediction)
#0.0
print(result.rawPrediction)
#[0.0875657130274,-0.0875657130274]

#儲存模型的訓練引數
temp_path=""
svm_path = temp_path + "/svm"
svm.save(svm_path)
#過載模型訓練引數
svm2 = LinearSVC.load(svm_path)
svm2.getMaxIter()
# 5
# 儲存訓練好的模型
model_path = temp_path + "/svm_model"
model.save(model_path)
#過載訓練好的模型
model2=LinearSVCModel.load(model_path)
model.coefficients[0] == model2.coefficients[0]
# True
model.intercept == model2.intercept
# True

決策樹

(1)DecisionTreeClassifier:支援二進位制和多類標籤,以及連續和分類功能

  • pyspark.ml.classification.DecisionTreeClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", seed=None)

  • maxDepth引數來限制樹的深度;

  • minInstancesPerNode確定需要進一步拆分的樹節點的觀察物件的最小數量;maxBins引數指定連續變數將被分割的Bin的最大數量;

  • impurity指定用於測量並計算來自分割的資訊的度量。

(2)DecisionTreeRegressor:支援連續和分類功能。

  • pyspark.ml.regression.DecisionTreeRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", seed=None, varianceCol=None)
>>> from pyspark.ml.linalg import Vectors
>>> df = spark.createDataFrame([
...     (1.0, Vectors.dense(1.0)),
...     (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
>>> dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")
>>> model = dt.fit(df)
>>> model.depth
1
>>> model.numNodes
3
>>> model.featureImportances
SparseVector(1, {0: 1.0})
>>> model.numFeatures
1
>>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
>>> model.transform(test0).head().prediction
0.0
>>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
>>> model.transform(test1).head().prediction
1.0
>>> dtr_path = temp_path + "/dtr"
>>> dt.save(dtr_path)
>>> dt2 = DecisionTreeRegressor.load(dtr_path)
>>> dt2.getMaxDepth()
2
>>> model_path = temp_path + "/dtr_model"
>>> model.save(model_path)
>>> model2 = DecisionTreeRegressionModel.load(model_path)
>>> model.numNodes == model2.numNodes
True
>>> model.depth == model2.depth
True
>>> model.transform(test1).head().variance
0.0

梯度提升決策樹模型

(1)GBTClassifier:支援二元標籤,以及連續和分類功能,不支援多類標籤

用於分類的梯度提升決策樹模型。該模型屬於整合模型(Ensemble methods)家族。整合模型結合多個弱預測模型而形成一個強健的模型。

  • pyspark.ml.classification.GBTClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="logistic", maxIter=20, stepSize=0.1, seed=None, subsamplingRate=1.0)

引數參考:是mllib的介面而還是ml的介面,這裡只是作一個參考
trainClassifier引數說明

  • data - 訓練資料集:LabeledPoint的RDD。標籤取值{0,1}
  • categoricalFeaturesInfo - 儲存類別特徵的Map。條目(n - > k)表示特徵n對應k個類別,類別由{0,1,...,k-1}索引。
  • loss - 梯度提升中用到的損失函式。支援的值:“logLoss”,“minimumSquaresError”,“minimumAbsoluteError”。 (預設值:“logLoss”)
  • numIterations - 迭代次數。 (預設值:100)
  • learningRate - 學習率。學習率應在間隔(0,1)之間(預設值:0.1)
  • maxDepth - 樹的最大深度(例如深度0表示1個葉節點,深度1表示1個內部節點+ 2個葉節點)。 (預設值:3)
  • maxBins - 用於分割特徵的最大bin數量。 DecisionTree需要maxBins> = max類別。 (預設值:32)
  • 返回值:GradientBoostedTreesModel 可用於預測。
#官網例子
#-*- coding:utf-8 -*-

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import GBTClassificationModel
from numpy import allclose

# data=[LabeledPoint(0.0,[0.0]),LabeledPoint(0.0,[1.0]),LabeledPoint(1.0,[2.0]),LabeledPoint(1.0,[3.0])]
sc=SparkContext(appName="myApp")
spark=SparkSession.builder.enableHiveSupport().getOrCreate()

df = spark.createDataFrame([(1.0, Vectors.dense(1.0)),(0.0, Vectors.sparse(1, [], []))], ["label", "features"])
# df.show()
stringIndexer=StringIndexer(inputCol="label",outputCol="indexed")
siModel=stringIndexer.fit(df)
td=siModel.transform(df)
gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
model = gbt.fit(td)
model.featureImportances
#SparseVector(1, {0: 1.0})
allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])
#True
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
model.transform(test0).head().prediction
#0.0
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#1.0
model.totalNumNodes
#15
print(model.toDebugString)
temp_path="/tmp/test/"
gbtc_path = temp_path + "gbtc"
gbt.save(gbtc_path)
model_path = temp_path + "gbtc_model"
model.save(model_path)


gbt2 = GBTClassifier.load(gbtc_path)
gbt2.getMaxDepth()
#2

model2 = GBTClassificationModel.load(model_path)
model.featureImportances == model2.featureImportances
#True
model.treeWeights == model2.treeWeights
# True
model.trees
# [DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]

(2)GBTRegressor:梯度提升樹(GBT)的迴歸學習演算法,它支援連續和分類功能。

  • pyspark.ml.regression.GBTRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, subsamplingRate=1.0, checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1, seed=None, impurity="variance")

trainRegressor引數說明

  • data - 訓練資料集:LabeledPoint的RDD。標籤取值{0,1}。
  • categoricalFeaturesInfo - 儲存類別特徵的Map。條目(n - > k)表示特徵n對應k個類別,類別由{0,1,…,k-1}索引。
  • loss - 損失函式。支援的值:“logLoss”,“minimumSquaresError”,“minimumAbsoluteError”。 (預設值:“leastSquaresError”)
  • numIterations - 提升次數。 (預設值:100)
  • learningRate - 學習率。學習率應在間隔(0,1)之間(預設值:0.1)
  • maxDepth - 樹的最大深度(例如深度0表示1個葉節點,深度1表示1個內部節點+ 2個葉節點)。 (預設值:3)
  • maxBins - 用於分裂特徵的最大bin數量。 DecisionTree需要maxBins> = max類別。 (預設值:32)
  • 返回值:GradientBoostedTreesModel可用於預測。

隨機森林

(1)RandomForestClassifier:隨機森林學習演算法的分類。 它支援二進位制和多類標籤,以及連續和分類功能。
該模型產生多個決策樹,使用模式輸出的決策樹來對觀察物件進行分類。

  • pyspark.ml.classification.RandomForestClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", numTrees=20, featureSubsetStrategy="auto", seed=None, subsamplingRate=1.0)
#-*- coding:utf-8 -*-

from pyspark import SparkContext
from pyspark.sql import SparkSession
import numpy
from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import RandomForestClassificationModel

sc=SparkContext(appName="myApp")
spark=SparkSession.builder.enableHiveSupport().getOrCreate()
df=spark.createDataFrame([(1.0,Vectors.dense(1.0)),(0.0,Vectors.sparse(1,[],[]))],["label","features"])
stringIndexer=StringIndexer(inputCol="label",outputCol="indexed")
siModel=stringIndexer.fit(df)
td=siModel.transform(df)
rf=RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)
model=rf.fit(td)
print(model.featureImportances)
#(1,[0],[1.0])
print(model.treeWeights)
# [1.0, 1.0, 1.0]
print(allclose(model.featureImportances,[1.0, 1.0, 1.0]))
# True
#利用訓練的模型對新資料進行預測
test0 = spark.createDataFrame([(Vectors.dense(-1),)],["features"])
result=model.transform(test0).head()
print(result.prediction)
# 0.0
print(numpy.argmax(result.probability))
# 0
print(numpy.argmax(result.rawPrediction))
# 0
print(model.trees)
#[DecisionTreeClassificationModel (uid=dtc_d54f917f8495) of depth 0 with 1 nodes, DecisionTreeClassificationModel (uid=dtc_e5ab92161f67) of depth 1 with 3 nodes, DecisionTreeClassificationModel (uid=dtc_437a7e97c21f) of depth 1 with 3 nodes]
#儲存模型引數
temp_path=""
rfc_path = temp_path + "/rfc"
rf.save(rfc_path)
#過載未訓練模型
rf2=RandomForestClassifier.load(rfc_path)
print(rf2.getNumTrees())
# 3
#儲存訓練好的模型
model_path=temp_path + "/rfc_model"
model.save(model_path)
#過載訓練好的模型
model2=RandomForestClassificationModel.load(model_path)
print(model.featureImportances==model2
            
           

相關推薦

資料工具---spark使用練習---ml()

模型訓練 評估器 評估器是需要評估的統計模型,對所觀測物件做預測或分類。如果從抽象的評估器類派生,新模型必須實現.fit()方法,該方法用給出的在DataFrame中找到的資料和某些預設或自定義的引數來擬合模型。在PySpark 中,由很多評估器可用,本文以Sp

資料工具---spark使用練習---ml(一)

Spark中ml和mllib的區別 來源: Spark中ml和mllib的主要區別和聯絡如下: ml和mllib都是Spark中的機器學習庫,目前常用的機器學習功能2個庫都能滿足需求。 spark官方推薦使用ml, 因為ml功能更全面更靈活,未來會主要支援ml

資料工具---Spark的使用方法(

Spark使用過程中報錯彙總 報錯1: “ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(ap

資料工具---pyspark使用方法練習

來源,官網spark2.2.1版本 pyspark不同函式的形象化解釋 SparkSession是Spark 2.0引入的新概念。SparkSession為使用者提供了統一的切入點,來讓使用者學習spark的各項功能。 在spark的早期版本中,SparkC

零基礎入門大資料spark中的幾種map

今天再來說一下spark裡面的幾種map方法。前面的文章介紹過單純的map,但是spark還有幾種map值得對比一下,主要是下面幾種: map:普通的map flatMap:在普通map的基礎上多了一個操作,扁平化操作; mapPartitions:相對於分割槽P

零基礎入門大資料spark的rdd

本節簡單介紹一下spark下的基本資料結構RDD,方便理解後續的更多操作。 那麼第一個問題,什麼是rdd。我們知道,大資料一般儲存在分散式叢集裡面,那麼你在對其進行處理的時候總得把它讀出來吧,讀出來後總得把它存成某種格式的檔案吧,就好比程式語言裡面的,這個資料是陣列,那麼你可以以陣列

【機器學習】資料演算法——關聯規則(),挖掘過程,Aprioir演算法

關聯規則挖掘的原理和過程 從關聯規則(一)的分析中可知,關聯規則挖掘是從事務集合中挖掘出這樣的關聯規則:它的支援度和置信度大於最低閾值(minsup,minconf),這個閾值是由使用者指定的。根據 support=(X,Y).count/T.countsupp

推薦:六款強大的開源資料工具

在網際網路發展到大資料時代,那麼資料就等於金錢。隨著向一個基於應用的領域過渡,資料則呈現出了指數級增長。然而,百分之八十的資料是非結構化的,因此它需要一個程式和方法來從中提取有用資訊,並且將其轉換為可理解、可用的結構化形式。 在資料探勘過程中,有大量的工具可供使用,比如採用人工智慧、機器學習,以及其他技術等

Mahout資料工具

Mahout是Hadoop系統基於MapReduce開發的資料探勘/機器學習庫,實現了大部分常用的資料探勘演算法。 Mahout提供的演算法:Classification、Clustering、Dimension Reduction、Freq. Pattern Mining

Rattle :基於R的資料工具(4b):探索資料

(3)相關性Correlation選項。計算數值變數間的相關係數。 相關係數可以採用pearson,kendall,spearman三種方法。會輸出相關係數矩陣。可以把結果視覺化。 這個圖當中,紅色表示負相關,藍色為正相關,顏色越淺相關係數(絕對值)越小,越接近直線

資料 | 親和性分析(

上回講了親和性分析的簡單分析,但只計算了一條規則的支援度和置信度,現在來說說怎麼計算所有規則的支援度和置信度 首先先建立字典,分別建立有效規則字典、無效規則字典以及條件相同的規則數量 # 建立字典,儲存規則有效資料及無效資料 from collection

資料工具---sklearn使用總結

本文來源Cer_ml和Jorocco; sklearn是一個數據挖掘的python庫,github地址,該庫集成了大量的資料探勘演算法,並可以對資料做預處理,對演算法進行整合和預測結果進行驗證和評估。sklearn在資料量不是特別大的時候是很好用的;在大資料時,

資料工具Weka之資料格式ARFF及CSV檔案格式轉換

Weka介紹: Weka是一個用Java編寫的資料探勘工具,能夠執行在各種平臺上。它不僅提供了可以直接用於資料探勘的軟體,還提供了src程式碼,使用者可以修改原始碼,進行二次開發。但是,由於其使用了Java虛擬機器,導致其不適合處理大型資料,執行緩慢。處理超過一定大小資料,

Python資料工具總結

      Python語言之所以很流行,廣泛用於機器學習,資料探勘等領域,因為它有強大的第三方庫,下面我們就來做一個簡單總結。     Numpy:       提供陣列支援,向量運算,以及高效的處理函式,線性代數處理等。                         參

資料入門系列教程()之分類問題OneR演算法

資料探勘入門系列教程(二)之分類問題OneR演算法 資料探勘入門系列部落格:https://www.cnblogs.com/xiaohuiduan/category/1661541.html 專案地址:GitHub 在上一篇部落格中,我們通過分析親和性來尋找資料集中資料與資料之間的相關關係。這篇部落

資料乾貨總結()--NLP進階-詳解Jieba分詞工具

NLP進階-詳解Jieba分詞工具 一、Jieba分詞工具 1. 三種模式 • 精確模式:將句子最精確的分開,適合文字分析 • 全模式:句子中所有可以成詞的詞語都掃描出來,速度快,不能解決歧義 • 搜尋引擎模式:在精確模式基礎上,對長詞再次切分,提高召回 2.實現的演算法 • 基於Tri

【Mark Schmidt課件】機器學習與資料——普通最小

本課件主要內容包括: 有監督學習:迴歸 示例:依賴與解釋變數 數字標籤的處理 一維線性迴歸 最小二乘目標 微分函式最小化 最小二乘解 二維最小二乘 d維最小二乘 偏微分

資料:基於Spark+HanLP實現影視評論關鍵詞抽取(1)

1. 背景 近日專案要求基於爬取的影視評論資訊,抽取影視的關鍵字資訊。考慮到影視評論資料量較大,因此採用Spark處理框架。關鍵詞提取的處理主要包含分詞+演算法抽取兩部分。目前分詞工具包較為主流的,包括哈工大的LTP以及HanLP,而關鍵詞的抽取演算法較多,包括TF-IDF、TextRank、互資訊等。本次

吳裕雄 資料與分析案例實戰(3)——python數值計算工具:Numpy

# 匯入模組,並重命名為npimport numpy as np# 單個列表建立一維陣列arr1 = np.array([3,10,8,7,34,11,28,72])print('一維陣列:\n',arr1)# 一維陣列元素的獲取print(arr1[[2,3,5,7]]) # 巢狀元組建立二維陣列ar

吳裕雄 資料與分析案例實戰(4)——python資料處理工具:Pandas

# 匯入模組import pandas as pdimport numpy as np # 構造序列gdp1 = pd.Series([2.8,3.01,8.99,8.59,5.18])print(gdp1)# 取出gdp1中的第一、第四和第五個元素print('行號風格的序列:\n',gdp1[[0,3,