在Apache Spark上跑Logistic Regression演算法及其中的一些錯誤問題
本文旨在介紹使用機器學習演算法,來介紹Apache Spark資料處理引擎。我們一開始會先簡單介紹一下Spark,然後我們將開始實踐一個機器學習的例子。我們將使用Qualitative Bankruptcy資料集,來自UCI機器學習資料倉庫。雖然Spark支援同時Java,Scala,Python和R,在本教程中我們將使用Scala作為程式語言。不用擔心你沒有使用Scala的經驗。練習中的每個程式碼段,我們都會詳細解釋一遍。
APACHE SPARK
Apache Spark是一個開源的叢集計算框架,用Spark編寫的應用程式可以比Hadoop MapReduce正規化的速度高100倍以上。Spark的一個主要的特點,基於記憶體,執行速度快,不僅如此,複雜應用在Spark系統上執行,也比基於磁碟的MapReduce更有效。Spark還旨在更通用,因此它提供了以下庫:
-
Spark SQL,處理結構化資料的模組
-
MLlib,可擴充套件的機器學習庫
-
GraphX,圖和圖的平行計算API
-
Spark Streaming,可擴充套件的,可容錯的流式計算程式
正如已經提到的,Spark支援Java,Scala,Python和R程式語言。它還集成了其他大資料工具。特別是,Spark可以執行在Hadoop叢集,可以訪問任何資料來源,包括Hadoop Cassandra。
Spark核心概念
在一個高的抽象層面,一個Spark的應用程式由一個驅動程式作為入口,在一個叢集上執行各種並行操作。驅動程式包含了你的應用程式的main函式,然後將這些應用程式分配給叢集成員執行。驅動程式通過SparkContext物件來訪問計算叢集。對於互動式的shell應用,SparkContext預設可通過sc變數訪問。
Spark的一個非常重要的概念是RDD–彈性分散式資料集。這是一個不可改變的物件集合。每個RDD會分成多個分割槽,每個分割槽可能在不同的群集節點上參與計算。RDD可以包含任何型別的Java,Scala物件,Python或R,包括使用者自定義的類。RDDS的產生有兩種基本方式:通過載入外部資料集或分配物件的集合如,list或set。
在建立了RDDs之後,我們可以對RDDs做2種不同型別的操作:
-
Transformations - 轉換操作,從一個RDD轉換成另外一個RDD
-
Actions - 動作操作,通過RDD計算結果
RDDs通過lazy的方式計算 - 即當RDDs碰到Action操作時,才會開始計算。Spark的Transformations操作,都會積累成一條鏈,只有當需要資料的時候,才會執行這些Transformations操作。每一次RDD進行Action操作時,RDD都會重新生成。如果你希望某些中間的計算結果能被其他的Action操作複用,那麼你需要呼叫Spark的RDD.persist()來儲存中間資料。
Spark支援多種執行模式,你可以使用互動式的Shell,或者單獨執行一個standalone的Spark程式。不管哪一種方式,你都會有如下的工作流:
-
輸入資料,用於生成RDD
-
使用Transformations操作轉換資料集
-
讓Spark儲存一些中間計算結果,用於複用計算
-
使用Action操作,讓Spark平行計算。Spark內部會自動優化和執行計算任務。
安裝Apache Spark
為了開始使用Spark,需要先從官網下載。選擇“Pre-built for Hadoop 2.4 and later”版本然後點選“Direct Download”。如果是Windows使用者,建議將Spark放進名字沒有空格的資料夾中。比如說,將檔案解壓到:C:\spark。
正如上面所說的,我們將會使用Scala程式語言。進入Spark的安裝路徑,執行如下命令:
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">// Linux and Mac users bin/spark-shell // Windows users bin\spark shell</span>
然後你可以在控制檯中看到Scala:
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">scala></span>
QUALITATIVE破產分類
現實生活中的問題是可以用機器學習演算法來預測的。我們將試圖解決的,通過一個公司的定性資訊,預測該公司是否會破產。資料集可以從UCI機器學習庫https://archive.ics.uci.edu/ml/datasets/qualitative_bankruptcy下載。在Spark的安裝資料夾中,建立一個新的資料夾命名為playground。複製qualitative_bankruptcy.data.txt檔案到這裡面。這將是我們的訓練資料。
這兒需要將檔案上傳到hdfs上,命令為:
hadoop dfs -copyFromLocal qualitative_bankruptcy.data.txt ./
資料集包含250個例項,其中143個例項為非破產,107個破產例項。
每一個例項資料格式如下:
-
工業風險
-
管理風險
-
財務靈活性
-
信譽
-
競爭力
-
經營風險
這些被稱為定性引數,因為它們不能被表示為一個數字。每一個引數可以取下以下值:
-
P positive
-
A average
-
N negative
資料集的最後一個列是每個例項的分類:B為破產或NB非破產。
鑑於此資料集,我們必須訓練一個模型,它可以用來分類新的資料例項,這是一個典型的分類問題。
解決問題的步驟如下:
-
從qualitative_bankruptcy.data.txt檔案中讀取資料
-
解析每一個qualitative值,並將其轉換為double型數值。這是我們的分類演算法所需要的
-
將資料集劃分為訓練和測試資料集
-
使用訓練資料訓練模型
-
計算測試資料的訓練誤差
SPARK LOGISTIC REGRESSION
我們將用Spark的邏輯迴歸演算法訓練分類模型。如果你想知道更多邏輯迴歸演算法的原理,你可以閱讀以下教程http://technobium.com/logistic-regression-using-apache-mahout。
在Spark的Scala Shell中貼上以下import語句:
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#ff0000;">import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}</span><span style="color:#3e3e3e;"> </span><span style="color:#000099;">這找了半天沒有找到<span style="line-height: 25.6000003814697px; font-family: 'Helvetica Neue', Helvetica, 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif;">LogisticRegressionWithLBFGS,所以一直報錯,最後改用LinearRegressionWithSGD,因此命令變為:</span></span></span>
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="font-family:Helvetica Neue, Helvetica, Hiragino Sans GB, Microsoft YaHei, Arial, sans-serif;"></span></span><pre name="code" style="margin-top: 0px; margin-bottom: 0px; font-size: 16px; line-height: 25.6000003814697px; font-family: 'Helvetica Neue', Helvetica, 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif; padding: 0px; max-width: 100%; box-sizing: border-box !important; word-wrap: break-word !important; background-color: rgb(255, 255, 255);"><span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#000099;">import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionModel}</span></span>
import org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.linalg.{Vector, Vectors}
這將匯入所需的庫。
接下來我們將建立一個Scala函式,將資料集中的qualitative資料轉換為Double型數值。鍵入或貼上以下程式碼並回車,在Spark Scala Shell。
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">def getDoubleValue( input:String ) : Double = { var result:Double = 0.0 if (input == "P") result = 3.0 if (input == "A") result = 2.0 if (input == "N") result = 1.0 if (input == "NB") result = 1.0 if (input == "B") result = 0.0 return result }</span>
如果所有的執行都沒有問題,你應該看到這樣的輸出:
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">getDoubleValue: (input: String)Double</span>
現在,我們可以讀取到qualitative_bankruptcy.data.txt檔案中的資料。從Spark的角度來看,這是一個Transformation操作。在這個階段,資料實際上不被讀入記憶體。如前所述,這是一個lazy的方式執行。實際的讀取操作是由count()引發,這是一個Action操作。
我把檔案放到了hdfs的根目錄下,所以下面的命令改為:
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#000099;">val data = sc.textFile("Qualitative_Bankruptcy.data.txt")</span></span>
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#cc0000;">val data = sc.textFile("playground/Qualitative_Bankruptcy.data.txt")</span><span style="color:#3e3e3e;"> data.count()</span></span>
用我們val關鍵字宣告一個常量data。它是一個包含輸入資料所有行的RDD。讀操作被SC或sparkcontext上下文變數監聽。count操作應返回以下結果:
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">res0: Long = 250</span>
現在是時候為邏輯迴歸演算法準備資料,將字串轉換為數值型。
下面命令有錯誤的地方,下面已經更正,注意標紅的地方。
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#3e3e3e;">val parsedData = data.map{line</span><span style="color:#ff0000;"> =></span><span style="color:#3e3e3e;"> val parts = line.split(",") LabeledPoint(getDoubleValue(parts(6)), Vectors.dense(parts.slice(0,6).map(x </span><span style="color:#cc0000;">=></span><span style="color:#3e3e3e;">getDoubleValue(x)))) }</span></span>
在這裡,我們聲明瞭另外一個常量,命名為parsedData。對於data變數中的每一行資料,我們將做以下操作:
-
使用“,”拆分字串,並獲得一個向量,命名為parts
-
建立並返回一個LabeledPoint物件。每個LabeledPoint包含標籤和值的向量。在我們的訓練資料,標籤或類別(破產或非破產)放在最後一列,陣列下標0到6。這是我們使用的parts(6)。在儲存標籤之前,我們將用getDoubleValue()函式將字串轉換為Double型。其餘的值也被轉換為Double型數值,並儲存在一個名為稠密向量的資料結構。這也是Spark的邏輯迴歸演算法所需要的資料結構。
Spark支援map()轉換操作,Action動作執行時,第一個執行的就是map()。
我們來看看我們準備好的資料,使用take():
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">parsedData.take(10)</span>
上面的程式碼,告訴Spark從parsedData陣列中取出10個樣本,並列印到控制檯。一樣的,take()操作之前,會先執行map()。輸出結果如下:
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">res5: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((1.0,[3.0,3.0,2.0,2.0,2.0,3.0]), (1.0,[1.0,1.0,2.0,2.0,2.0,1.0]), (1.0,[2.0,2.0,2.0,2.0,2.0,2.0]), (1.0,[3.0,3.0,3.0,3.0,3.0,3.0]), (1.0,[1.0,1.0,3.0,3.0,3.0,1.0]), (1.0,[2.0,2.0,3.0,3.0,3.0,2.0]), (1.0,[3.0,3.0,2.0,3.0,3.0,3.0]), (1.0,[3.0,3.0,3.0,2.0,2.0,3.0]), (1.0,[3.0,3.0,2.0,3.0,2.0,3.0]), (1.0,[3.0,3.0,2.0,2.0,3.0,3.0]))</span>
接著我們劃分一下訓練資料和測試資料,將parsedData的60%分為訓練資料,40%分為測試資料。
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L) val trainingData = splits(0) val testData = splits(1)</span>
訓練資料和測試資料也可以像上面一樣,使用take()者count()檢視。
激動人心的時刻,我們現在開始使用Spark的LogisticRegressioinWithLBFGS()來訓練模型。設定好分類個數,這裡是2個(破產和非破產):
由於改用LogisticRegressionWithSGD,所以下面的語句就會變化:
LogisticRegressionWithSGD的使用方法,主要參考
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#ff0000;">val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)(這句要去掉,改為下面兩句)</span></span>
val numIterations = 20
val model = LogisticRegressionWithSGD.train(parsedData,
numIterations)
當模型訓練完,我們可以使用testData來檢驗一下模型的出錯率。
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;"><span style="color:#3e3e3e;">val labelAndPreds = testData.map { point = val prediction = model.predict(point.features) (point.label, prediction) } </span><span style="color:#000099;">下面這句話有誤,注意標紅的部分,原來為=,更正為=>:</span><span style="color:#3e3e3e;"> val trainErr = labelAndPreds.filter(r </span><span style="color:#cc0000;">=></span><span style="color:#3e3e3e;"> r._1 != r._2).count.toDouble / testData.count</span></span>
變數labelAndPreds儲存了map()轉換操作,map()將每一個行轉換成二元組。二元組包含了testData的標籤資料(point.label,分類資料)和預測出來的分類資料(prediction)。模型使用point.features作為輸入資料。
最後一行程式碼,我們使用filter()轉換操作和count()動作操作來計算模型出錯率。filter()中,保留預測分類和所屬分類不一致的元組。在Scala中_1和_2可以用來訪問元組的第一個元素和第二個元素。最後用預測出錯的數量除以testData訓練集的數量,我們可以得到模型出錯率:
<span style="margin: 0px; padding: 0px; max-width: 100%; font-size: 14px; box-sizing: border-box !important; word-wrap: break-word !important;">trainErr: Double = 0.20430107526881722</span>
總結
在這個教程中,你已經看到了Apache Spark可以用於機器學習的任務,如logistic regression。雖然這只是非分散式的單機環境的Scala shell demo,但是Spark的真正強大在於分散式下的記憶體並行處理能力。
在大資料領域,Spark是目前最活躍的開源專案,在過去幾年已迅速獲得關注和發展。在過去的幾年裡。採訪了超過2100受訪者,各種各樣的使用情況和環境。
[參考資料]
“Learning Spark” by HoldenKarau, Andy Konwinski, Patrick Wendell and Matei Zaharia, O’Reilly Media 2015
Lichman, M. (2013). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science
https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression
https://spark.apache.org/docs/1.1.0/mllib-data-types.html
https://archive.ics.uci.edu/ml/datasets/Qualitative_Bankruptcy
https://databricks.com/blog/2015/01/27/big-data-projects-are-hungry-for-simpler-and-more-powerful-tools-survey-validates-apache-spark-is-gaining-developer-traction.html
附:
執行的程式碼:
import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionModel}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
def getDoubleValue( input:String ) : Double = {
var result:Double = 0.0
if (input == "P") result = 3.0
if (input == "A") result = 2.0
if (input == "N") result = 1.0
if (input == "NB") result = 1.0
if (input == "B") result = 0.0
return result
}
val data = sc.textFile("Qualitative_Bankruptcy.data.txt")
data.count()
val parsedData = data.map{line =>
val parts = line.split(",")
LabeledPoint(getDoubleValue(parts(6)), Vectors.dense(parts.slice(0,6).map(x =>getDoubleValue(x))))
}
parsedData.take(10)
val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
val trainingData = splits(0)
val testData = splits(1)
val numIterations = 20
val model = LogisticRegressionWithSGD.train(parsedData, numIterations)
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count