spark-ml 邏輯迴歸使用記錄
最近做專案需要用一下邏輯迴歸模型,一開始用了spark的基於RDD的mlib,寫了一個版本的指令碼,後來看官方文件發現mlib目前處於維護狀態,主推的還是基於Dataframe的ml,於是重頭開始寫基於ml的模型指令碼,哎,堅強地繼續搬磚
由於中間有個指標是事件型別,屬於字串型別,需要做單獨處理,這裡利用自帶的StringIndexer函式, 主要是根據事件型別出現頻次來確定數值,handleInvalid主要針對以後新出現的事件型別時候的處理方式,有error, skip和keep,具體含義可以參考官方文件的示例
from pyspark.ml.feature import StringIndexer indexer=StringIndexer(inputCol='eventtype',outputCol='eventtypeIndex',handleInvalid='keep') indexed=indexer.fit(raw_data).transform(raw_data).select(...)
接下來生成標籤和特徵向量
from pyspark.ml.linalg import Vectors
data=indexed.rdd.map(lambda x:(int(x[-1]),Vectors.dense(x[:-1]))).toDF(['label','features'])
接下來做變數標準化處理
from pyspark.ml.feature import StandardScaler scaler=StandardScaler(inputCol='features',outputCol='scaledFeatures',withStd=True,withMean=True) scaledData=scaler.fit(data).transform(data)
準備訓練資料和測試資料
train,test=scaledData.randomSplit([0.9,0.1],seed=12345)
開始訓練模型
lr = LogisticRegression(maxIter=10,regParam=0.01,featuresCol='scaledFeatures')
lrModel=lr.fit(train)
現在可以看一下模型的相關引數結果,看了下coefficients結果比較符合業務邏輯
print lrModel.coefficients print lrModel.intercept trainingSummary=lrModel.summary print trainingSummary.areaUnderROC fMeasure=trainingSummary.fMeasureByThreshold
下面是要根據fMeasure值來確定最優的Threshold, 官方文件示例程式碼如下
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']).select('threshold').head()['threshold']
執行結果會報錯
開啟debug之旅~
首先看下fMeasure的資料情況
看了F-Measure屬於double型別,maxFMeasure['max(F-Measure)']屬於float型別,按道理應該判斷相等沒問題,本著實踐出真知的態度,我們還是來測試一下
這裡就奇怪了,竟然是字串格式才能匹配到資料,那很自然地想到進行型別轉化一下再試試
咦?怎麼還是不行?debug1分鐘,2分鐘,.... 發現str轉換小數後最多隻能保留12位小數,而我們的小數位數有16位,測試結果如下:
開動小腦筋實在想不出解決這個問題的好辦法,那就只能重新自己寫程式碼實現這個功能了
res_list = fMeasure.collect()
fMeasure_list = list()
threshold_list = list()
for each in res_list:
threshold_list.append(each['threshold'])
fMeasure_list.append(each['F-Measure'])
threshold_index = fMeasure_list.index(max(fMeasure_list))
bestThreshold = threshold_list[threshold_index]
lr.setThreshold(bestThreshold)
好了,訓練工作完成,開始進行測試模型效果
prediction=lrModel.transform(test)
spark2.3.0版本可以直接計算precision,recall等指標,然而我用的是2.2.0版本,所以這部分程式碼還是得自己實現
res_cnt=prediction.groupBy(['label','prediction']).count().collect()
tp,tn,fp,fn=0,0,0,0
for each in res_cnt:
label=int(each['label'])
prediction=int(each['prediction'])
cnt=int(each['count'])
if label==0 and prediction==0:
tn=cnt
elif label==0 and prediction==1:
fp=cnt
elif label==1 and prediction == 0:
fp=cnt
else:
tp=cnt
precision=tp/((tp+fp)*1.0)
recall=tp/((tp+fn)*1.0)
模型評價結果:
由於黑標籤較少,當黑白標籤1:1時候,precision=0.83, recall=0.98; 當黑白標籤1:10時候,precision=0.93,recall=0.31
模型引數的調教工作還沒做,涉及regParam, elasticNetParam正則化引數的選擇問題,後續繼續研究一下~