Spark 機器學習實踐 :Iris資料集的分類
今天試用了一下Spark的機器學習,體驗如下:
第一步,匯入資料
我們使用Iris資料集,做一個分類,首先要把csv檔案匯入。這裡用到了spark的csv包,不明白為什麼這麼常見的功能不是內建的,還需要額外載入。
--packages com.databricks:spark-csv_2.11:1.4.0
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format('com.databricks.spark.csv') .options(header='true', inferschema='true') .load('iris.csv') # Displays the content of the DataFrame to stdout df.show()
結果如下:
+-----+------------+-----------+------------+-----------+-------+ |rowid|Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|Species| +-----+------------+-----------+------------+-----------+-------+ | 1| 5.1| 3.5| 1.4| 0.2| setosa| | 2| 4.9| 3.0| 1.4| 0.2| setosa| | 3| 4.7| 3.2| 1.3| 0.2| setosa| | 4| 4.6| 3.1| 1.5| 0.2| setosa| | 5| 5.0| 3.6| 1.4| 0.2| setosa| | 6| 5.4| 3.9| 1.7| 0.4| setosa| | 7| 4.6| 3.4| 1.4| 0.3| setosa| | 8| 5.0| 3.4| 1.5| 0.2| setosa| | 9| 4.4| 2.9| 1.4| 0.2| setosa| | 10| 4.9| 3.1| 1.5| 0.1| setosa| | 11| 5.4| 3.7| 1.5| 0.2| setosa| | 12| 4.8| 3.4| 1.6| 0.2| setosa| | 13| 4.8| 3.0| 1.4| 0.1| setosa| | 14| 4.3| 3.0| 1.1| 0.1| setosa| | 15| 5.8| 4.0| 1.2| 0.2| setosa| | 16| 5.7| 4.4| 1.5| 0.4| setosa| | 17| 5.4| 3.9| 1.3| 0.4| setosa| | 18| 5.1| 3.5| 1.4| 0.3| setosa| | 19| 5.7| 3.8| 1.7| 0.3| setosa| | 20| 5.1| 3.8| 1.5| 0.3| setosa| +-----+------------+-----------+------------+-----------+-------+ only showing top 20 rows
第二步,提取特徵
Spark要求把分類的標籤(label)轉換成數值進行計算,這一點沒有scklearn方便。Spark提供了StringIndexer功能,可以把字串轉換為索引值。
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Species", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
提取後,categoryIndex這一列裡面就是Species的索引值。
+-----+------------+-----------+------------+-----------+-------+-------------+
|rowid|Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|Species|categoryIndex|
+-----+------------+-----------+------------+-----------+-------+-------------+
| 1| 5.1| 3.5| 1.4| 0.2| setosa| 2.0|
| 2| 4.9| 3.0| 1.4| 0.2| setosa| 2.0|
| 3| 4.7| 3.2| 1.3| 0.2| setosa| 2.0|
| 4| 4.6| 3.1| 1.5| 0.2| setosa| 2.0|
| 5| 5.0| 3.6| 1.4| 0.2| setosa| 2.0|
| 6| 5.4| 3.9| 1.7| 0.4| setosa| 2.0|
| 7| 4.6| 3.4| 1.4| 0.3| setosa| 2.0|
| 8| 5.0| 3.4| 1.5| 0.2| setosa| 2.0|
| 9| 4.4| 2.9| 1.4| 0.2| setosa| 2.0|
| 10| 4.9| 3.1| 1.5| 0.1| setosa| 2.0|
| 11| 5.4| 3.7| 1.5| 0.2| setosa| 2.0|
| 12| 4.8| 3.4| 1.6| 0.2| setosa| 2.0|
| 13| 4.8| 3.0| 1.4| 0.1| setosa| 2.0|
| 14| 4.3| 3.0| 1.1| 0.1| setosa| 2.0|
| 15| 5.8| 4.0| 1.2| 0.2| setosa| 2.0|
| 16| 5.7| 4.4| 1.5| 0.4| setosa| 2.0|
| 17| 5.4| 3.9| 1.3| 0.4| setosa| 2.0|
| 18| 5.1| 3.5| 1.4| 0.3| setosa| 2.0|
| 19| 5.7| 3.8| 1.7| 0.3| setosa| 2.0|
| 20| 5.1| 3.8| 1.5| 0.3| setosa| 2.0|
+-----+------------+-----------+------------+-----------+-------+-------------+
only showing top 20 rows
第三步,模型訓練和驗證:
from pyspark.sql import Row
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import NaiveBayes
# Load and parse the data
def parseRow(row):
return Row(label=row["categoryIndex"],
features=Vectors.dense([row["Sepal.Length"],
row["Sepal.Width"],
row["Petal.Length"],
row["Petal.Width"]]))
## Must convert to dataframe after mapping
parsedData = indexed.map(parseRow).toDF()
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(parsedData)
predict_data = model.transform(parsedData)
traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count()
total = predict_data.count()
print traing_err, total, float(traing_err)/total
結果如下,在150個樣本的訓練集上,有7個預測錯誤:
7 150 0.0466666666667
這裡要注意幾點:
-
Spark有兩組機器學習的介面pyspark.ml和pyspark.mllib, 前一個是1.3推出的,比較新,功能也更豐富,後一個是0.9版本推出的,功能少一些。這兩組API是不相容的,你可以選一組來使用。
-
新的介面要求資料集的型別是dataframe
這裡把試用mllib的樣例也放出來,大家可以比較一下。
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
# Load and parse the data
def parseRow(row):
return LabeledPoint(row["categoryIndex"],
Vectors.dense([row["Sepal.Length"],row["Sepal.Width"],row["Petal.Length"],row["Petal.Width"]]))
## Must convert to dataframe after mapping
parsedData = indexed.map(parseRow)
nb = NaiveBayes()
model = nb.train(parsedData)
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
結果和使用ml的是一樣的:
Training Error = 0.0466666666667
然後我試用了SVM,程式碼如下:
from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
# Load and parse the data
def parseRow(row):
return LabeledPoint(row["categoryIndex"],
Vectors.dense([row["Sepal.Length"],row["Sepal.Width"],row["Petal.Length"],row["Petal.Width"]]))
## Must convert to dataframe after mapping
parsedData = indexed.map(parseRow)
nb = SVMWithSGD()
model = nb.train(parsedData, iterations=10)
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
結果出錯了:
Py4JJavaError: An error occurred while calling o3397.trainSVMModelWithSGD.
: org.apache.spark.SparkException: Input validation failed.
at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:251)
at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRegressionModel(PythonMLLibAPI.scala:94)
at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainSVMModelWithSGD(PythonMLLibAPI.scala:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
從這個結果完全看不出任何端倪。後來查詢到了後臺日誌發現了原因:
ERROR DataValidators: Classification labels should be 0 or 1. Found 50 invalid labels
原來Spark實現的SVM方法只能支援二分類,不支援大於二的分類。這個有點坑呀,scklearn好像是支援的。雖然SVM理論是基於二元分類的,但是有辦法擴充套件。
最後分享一個我在提取分類索引的時候的一個坑,因為覺得字串對映為數值本身邏輯比較簡單,我就自己實現了一個,然後去做map。
## ??? does not work
labels = dict()
def get_label(s):
if labels.get(s) is None:
print s
l = len(labels)
labels[s] = l
return labels.get(s)
# Load and parse the data
def parsePoint(row):
return LabeledPoint(get_label(row["Species"]), [row["Sepal.Length"],row["Sepal.Width"]])
parsedData = df.map(parsePoint)
然而這樣做是錯的,因為傳入map的labels是immutable的,在map方法中是無法修改labels的值的。這樣才能保證在分散式執行是的無狀態和並行。大家以後用的時候要小心。