1. 程式人生 > >spark-ml 邏輯迴歸使用記錄

spark-ml 邏輯迴歸使用記錄

    最近做專案需要用一下邏輯迴歸模型,一開始用了spark的基於RDD的mlib,寫了一個版本的指令碼,後來看官方文件發現mlib目前處於維護狀態,主推的還是基於Dataframe的ml,於是重頭開始寫基於ml的模型指令碼,哎,堅強地繼續搬磚快哭了

    由於中間有個指標是事件型別,屬於字串型別,需要做單獨處理,這裡利用自帶的StringIndexer函式, 主要是根據事件型別出現頻次來確定數值,handleInvalid主要針對以後新出現的事件型別時候的處理方式,有error, skip和keep,具體含義可以參考官方文件的示例

from pyspark.ml.feature import StringIndexer
indexer=StringIndexer(inputCol='eventtype',outputCol='eventtypeIndex',handleInvalid='keep')
indexed=indexer.fit(raw_data).transform(raw_data).select(...)

接下來生成標籤和特徵向量

from pyspark.ml.linalg import Vectors
data=indexed.rdd.map(lambda x:(int(x[-1]),Vectors.dense(x[:-1]))).toDF(['label','features'])

接下來做變數標準化處理

from pyspark.ml.feature import StandardScaler
scaler=StandardScaler(inputCol='features',outputCol='scaledFeatures',withStd=True,withMean=True)
scaledData=scaler.fit(data).transform(data)

準備訓練資料和測試資料

train,test=scaledData.randomSplit([0.9,0.1],seed=12345)

開始訓練模型

lr = LogisticRegression(maxIter=10,regParam=0.01,featuresCol='scaledFeatures')
lrModel=lr.fit(train)

現在可以看一下模型的相關引數結果,看了下coefficients結果比較符合業務邏輯

print lrModel.coefficients
print lrModel.intercept
trainingSummary=lrModel.summary
print trainingSummary.areaUnderROC
fMeasure=trainingSummary.fMeasureByThreshold

下面是要根據fMeasure值來確定最優的Threshold, 官方文件示例程式碼如下

maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']).select('threshold').head()['threshold']

執行結果會報錯


開啟debug之旅~

首先看下fMeasure的資料情況


看了F-Measure屬於double型別,maxFMeasure['max(F-Measure)']屬於float型別,按道理應該判斷相等沒問題,本著實踐出真知的態度,我們還是來測試一下


這裡就奇怪了,竟然是字串格式才能匹配到資料,那很自然地想到進行型別轉化一下再試試


咦?怎麼還是不行?debug1分鐘,2分鐘,.... 發現str轉換小數後最多隻能保留12位小數,而我們的小數位數有16位,測試結果如下:


開動小腦筋實在想不出解決這個問題的好辦法,那就只能重新自己寫程式碼實現這個功能了

res_list = fMeasure.collect()
fMeasure_list = list()
threshold_list = list()
for each in res_list:
    threshold_list.append(each['threshold'])
    fMeasure_list.append(each['F-Measure'])
threshold_index = fMeasure_list.index(max(fMeasure_list))
bestThreshold = threshold_list[threshold_index]
lr.setThreshold(bestThreshold)

好了,訓練工作完成,開始進行測試模型效果

prediction=lrModel.transform(test)

spark2.3.0版本可以直接計算precision,recall等指標,然而我用的是2.2.0版本,所以這部分程式碼還是得自己實現

res_cnt=prediction.groupBy(['label','prediction']).count().collect()
tp,tn,fp,fn=0,0,0,0
for each in res_cnt:
    label=int(each['label'])
    prediction=int(each['prediction'])
    cnt=int(each['count'])
    if label==0 and prediction==0:
        tn=cnt
    elif label==0 and prediction==1:
        fp=cnt
    elif label==1 and prediction == 0:
        fp=cnt
    else:
        tp=cnt
precision=tp/((tp+fp)*1.0)
recall=tp/((tp+fn)*1.0)

模型評價結果:

由於黑標籤較少,當黑白標籤1:1時候,precision=0.83, recall=0.98; 當黑白標籤1:10時候,precision=0.93,recall=0.31

模型引數的調教工作還沒做,涉及regParam, elasticNetParam正則化引數的選擇問題,後續繼續研究一下~