1. 程式人生 > >《深度實踐Spark機器學習 》第11章 pyspark決策樹模型

《深度實踐Spark機器學習 》第11章 pyspark決策樹模型

由於此書不配程式碼,以下程式碼都是本寶寶在ipynb測試過的,執行環境為hdp2.6.2和Anaconda2。

完整ipynb和py程式碼地址:

https://gitee.com/iscas/deep_spark_ml/tree/master


11.3 資料載入
刪除標題
sed 1d train.tsv > train_noheader.tsv

上傳到hdfs
hdfs dfs -put train_noheader.tsv /u01/bigdata/data/




11.4 資料探索
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark
Path = "hdfs://XX:8020/u01/bigdata/"
raw_data = sc.textFile(Path + "data/train_noheader.tsv")
raw_data.take(2)

檢視總行數:
numRaw = raw_data.count()
numRaw

按鍵進行統計
raw_data.countByKey()

11.5 資料預處理
1)過濾時間戳和網頁內容
records = raw_data.map(lambda line: line.split('\t'))
records.first()

3)檢視每行列數
len(records.first())

4)將RDD中的所有元素以列表的形式返回
data = records.collect()

5)檢視data資料一行有多少列
numColumns = len(data[0])
numColumns

6)對資料進行清理工作,並且定義一個列表data1,存放處理過的資料,格式為[(label_1, feature_1), (label_2, feature_2), ...]
清理步驟如下:
1)去掉引號
2)把標籤列(即最後一列)轉換為整數
3)把第4列的?轉換為0.0
程式碼如下:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import DecisionTreeClassifier

data1 = []
for i in range(numRaw):
trimmed = [ each.replace('"', "") for each in data[i] ]
label = int(trimmed[-1])
features = map(lambda x: 0.0 if x == "?" else x, trimmed[4: numColumns-1]) #只取第5到27列
c = (label, Vectors.dense(map(float, features)))
data1.append(c)

檢視data1資料:
data1[0] #下面是輸出結果
(0, DenseVector([0.7891, 2.0556, 0.6765, 0.2059, 0.0471, 0.0235, 0.4438, 0.0, 0.0, 0.0908, 0.0, 0.2458, 0.0039, 1.0, 1.0, 24.0, 0.0, 5424.0, 170.0, 8.0, 0.1529, 0.0791]))


11.6 建立決策樹模型
1)將data1轉換為DataFrame物件,label為標籤列,features為特徵值列
df = spark.createDataFrame(data1, ["label", "features"])
df.show(2)

#顯示df的schema
df.printSchema(),下面是結果
root
|-- label: long (nullable = true)
|-- features: vector (nullable = true)
 
2)將df載入記憶體
df.cache()

3)建立特徵索引
from pyspark.ml.feature import VectorIndexer
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=24).fit(df)

4)將資料切分成80%訓練集和20%測試集
(trainData, testData) = df.randomSplit([0.8, 0.2], seed=1234L) #seed=1234L表示每次生成的訓練集和測試集總行數不變
trainData.count()
testData.count()

5)指定決策樹模型的深度、標籤列、特徵值列、使用資訊熵作為評估方法,並訓練資料
dt = DecisionTreeClassifier(maxDepth=5, labelCol="label", featuresCol="indexedFeatures", impurity="entropy")

6)構建流水工作流
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [featureIndexer, dt])
model = pipeline.fit(trainData) #訓練模型


11.7 訓練模型進行預測
2)使用資料集中第一行的特徵值資料進行預測
test0 = spark.createDataFrame([(data1[0][1],)], ["features"])
result = model.transform(test0)
result.show()
輸出結果:
+--------------------+--------------------+-------------+--------------------+----------+
|            features|     indexedFeatures|rawPrediction|         probability|prediction|
+--------------------+--------------------+-------------+--------------------+----------+
|[0.789131,2.05555...|[0.789131,2.05555...|[564.0,578.0]|[0.49387040280210...|       1.0|
+--------------------+--------------------+-------------+--------------------+----------+

result.select(['prediction']).show() #只獲取預測值
輸出結果:
+----------+
|prediction|
+----------+
|       1.0|
+----------+

3)將第一行的特徵值資料修改掉2個(這裡換掉第1個和第2個值),進行該特徵值下的預測
firstRaw = list(data1[0][1])
firstRaw[0]
firstRaw[1]

predictData = Vectors.dense(firstRaw)
predictData
結果:
DenseVector([0.7891, 2.0556, 0.6765, 0.2059, 0.0471, 0.0235, 0.4438, 0.0, 0.0, 0.0908, 0.0, 0.2458, 0.0039, 1.0, 1.0, 24.0, 0.0, 5424.0, 170.0, 8.0, 0.1529, 0.0791])

4)進行新資料的預測
predictRaw = spark.createDataFrame([(predictData,)], ["features"])
predictResult = model.transform(predictRaw)
predictResult.show()
結果:
+--------------------+--------------------+-------------+--------------------+----------+
|            features|     indexedFeatures|rawPrediction|         probability|prediction|
+--------------------+--------------------+-------------+--------------------+----------+
|[0.789131,2.05555...|[0.789131,2.05555...|[564.0,578.0]|[0.49387040280210...|       1.0|
+--------------------+--------------------+-------------+--------------------+----------+

5)用測試資料做測試
#通過模型,預測測試集
predictResultAll = model.transform(testData)

predictResultAll.select(['prediction']).show()

#由於預測值是DataFrame物件,每一行是Raw型,不可做修改
#需將預測值轉換為pandas,然後轉換為列表
df_predict = predictResultAll.select(['prediction']).toPandas()
dtPredict = list(df_predict.prediction)

#檢視前10個預測值
dtPredict[:10]

#對預測值做準確性統計
dtTotalCorrect = 0

#獲取測試集的總行數
testRaw = testData.count();
#testLabel = testData.select("label").collect() #這個獲取的row不是list?版本原因
df_test = testData.select(['label']).toPandas()
testLabel = list(df_test.label)
testLabel[:10]

for i in range(testRaw):
if dtPredict[i] == testLabel[i]:
dtTotalCorrect += 1

1.0 * dtTotalCorrect / testRaw 


11.8 模型優化
11.8.1 特徵值的優化
1)將之前用到的一些程式碼加進來
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorIndexer
from pyspark.ml import Pipeline
raw_data = sc.textFile(Path + "data/train_noheader.tsv")
numRaw = raw_data.count()
records = raw_data.map(lambda line: line.split('\t'))
data = records.collect()
numColumns = len(data[0])
data1 = []

2)由於這裡對網頁型別的標識有很多,需要單獨挑選出來做處理
#將第3列網頁型別的引號除掉
category = records.map(lambda x: x[3].replace("\"", ""))
categories = sorted(category.distinct().collect())
categories

3)檢視網頁型別的個數
numCategories = len(categories)
numCategories

4)可以使用“One of K”來進行標籤轉換,即[0,0,0,0]和[1,0,0,0]這樣的列表;這裡定義一個函式,用於返回當前網頁型別列表
def transform_category(x):
markCategory = [0] * numCategorys
index = categories.index(x)
markCategory[index] = 1
return markCategory

5)通過這樣的處理,我們將網頁型別這一特徵值轉化為14個特徵值,整體的特徵值其實就是增加了14個。接下來,我們在處理的時候將這些特徵值加入進去
for i in range(numRaw):
trimmed = [ each.replace('"', "") for each in data[i] ]
label = int(trimmed[-1])
cate = transform_category(trimmed[3]) #呼叫引數,返回一個型別列表
features = cate + map(lambda x: 0.0 if x == "?" else x, trimmed[4: numColumns-1]) #只取第5到27列
c = (label, Vectors.dense(map(float, features)))
data1.append(c)

6)建立DataFrame物件
df = spark.createDataFrame(data1, ["label", "features"])
df.cache()

7)建立特徵索引
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=24).fit(df)

8)將資料切分成80%訓練集和20%測試集
(trainData, testData) = df.randomSplit([0.8, 0.2], seed=1234L) #seed=1234L表示每次生成的訓練集和測試集總行數不變
trainData.count()
testData.count()

9)建立決策樹模型
dt = DecisionTreeClassifier(maxDepth=5, labelCol="label", featuresCol="indexedFeatures", impurity="entropy")

10)構建流水工作流
pipeline = Pipeline(stages = [featureIndexer, dt])
model = pipeline.fit(trainData) #訓練模型

11)用測試資料在下一次做下決策樹準確度測試
predictResultAll = model.transform(testData)
df_predict = predictResultAll.select(['prediction']).toPandas()
dtPredict = list(df_predict.prediction)

#對預測值做準確性統計
dtTotalCorrect = 0

#獲取測試集的總行數
testRaw = testData.count();
#testLabel = testData.select("label").collect() #這個獲取的row不是list?版本原因
df_test = testData.select(['label']).toPandas()
testLabel = list(df_test.label)

for i in range(testRaw):
if dtPredict[i] == testLabel[i]:
dtTotalCorrect += 1

1.0 * dtTotalCorrect / testRaw 


11.8.2 交叉驗證和網格引數
#匯入交叉驗證和引數網格
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
#匯入二分類評估器
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator() #初始化一個評估器
#設定引數網格
paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [4,5,6]).build()
#設定交叉認證的引數
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)  # use 3+ folds in practice
  
# 通過交叉認證來訓練模型, and choose the best set of parameters.
cvModel = crossval.fit(trainData)
# 測試模型
predictResultAll = cvModel.transform(testData)
df_predict = predictResultAll.select(['prediction']).toPandas()
dtPredict = list(df_predict.prediction)

dtTotalCorrect = 0
testRaw = testData.count();
df_test = testData.select(['label']).toPandas()
testLabel = list(df_test.label)

for i in range(testRaw):
if dtPredict[i] == testLabel[i]:
dtTotalCorrect += 1

1.0 * dtTotalCorrect / testRaw 


我們還可以檢視最匹配模型的具體引數:
bestModel = cvModel.bestModel.stages[1]
bestModel

bestModel.numFeatures #決策樹有36個特徵值

bestModel.depth #最大深度為10

bestModel.numNodes #決策樹節點

11.9 指令碼方式執行,拷貝進去的時候,記得tab等,可執行檔案在碼雲
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# coding: utf-8


if __name__ == "__main__":
#指定本地執行spark
sparkConf = SparkConf().setMaster("local[*]") 
sc = SparkContext(conf = sparkConf)
spark = SparkSession.builder.master('local').appName("DecisionTree").config("spark.some.config.option", "some-value").getOrCreate()


spark-submit ch11_decisionTree.py