基於spark構建邏輯迴歸
阿新 • • 發佈:2019-01-09
from __future__ import print_function
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import StandardScaler
print ('666')
sc=SparkContext()
good=sc.textFile("hdfs://master:9000/usr/bad/test")
print ('888')
df=good.map(lambda x:x.split(','))
i=df.take(2)
print(i)
sqlContext=SQLContext(sc)
data=sqlContext.createDataFrame(df,["id","b","c","d","e","f","g","h","i","j","k"])
data.show(6,False)
h=data.printSchema
print(h)
label_set = data.map(lambda x: x[10]).distinct().collect()
label_dict = dict() #此句為構建一個空白的字典
i = 0
for key in label_set:
if key not in label_dict.keys(): #label_dict.keys()中keys()表示字典裡邊的鍵值對,意思為如果key沒有在字典中出現,那麼記錄此key的值為i,key和keys沒有關係。
label_dict[key ]= i
i = i+1
print(label_dict)
print('999')
data1=data.map(lambda x: ([x[i] for i in range(10)],label_dict[x[10]])).\ #第一個map將資料集對映為兩個部分,分為了x和y兩部分
map(lambda (x,y): [float(x[0]), float(x[1]),float(x[2]),float(x[3]),float(x[4]),float(x[5]), #第二個map將上一個map中的x轉換為數值型別,y不動
float(x[6]),float(x[7]),int(float(x[8])),int(float(x[9])),y])
print('1010')
data2 = sqlContext.createDataFrame(data1,['a',"b","c","d","e","f","g","h","i","j","id"])
print('111111')
data2.show(5, False)
s=data2.printSchema
print(s)
print('8595')
data3 = data2.map(lambda x:(Vectors.dense([x[i] for i in range(0,10)]),float(x[10]))) #將所有自變數組合成一個特徵向量,以符合演算法需要的資料型別
print(data3)
feature_data = sqlContext.createDataFrame(data3, ['features', 'id'])
print('7878')
train_data, test_data = feature_data.randomSplit([0.7, 0.3], 6) #分割資料集
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures',withStd=True, withMean=False)
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
lr = LogisticRegression(maxIter=10, featuresCol='pcaFeatures', labelCol='id')
print('6565')
pipeline = Pipeline(stages=[scaler, pca, lr])
Model = pipeline.fit(train_data)
results = Model.transform(test_data)
results.show()
最後評估模型,構建混淆矩陣
from pyspark.mllib.evaluation import MulticlassMetrics
predictionAndLabels = results.select('probability', 'prediction', 'prediction').map(lambda x: (x[1], x[2]))
metrics = MulticlassMetrics(predictionAndLabels)
table=metrics.confusionMatrix().toArray()
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import StandardScaler
print ('666')
sc=SparkContext()
good=sc.textFile("hdfs://master:9000/usr/bad/test")
print ('888')
df=good.map(lambda x:x.split(','))
i=df.take(2)
print(i)
sqlContext=SQLContext(sc)
data=sqlContext.createDataFrame(df,["id","b","c","d","e","f","g","h","i","j","k"])
data.show(6,False)
h=data.printSchema
print(h)
label_set = data.map(lambda x: x[10]).distinct().collect()
label_dict = dict() #此句為構建一個空白的字典
i = 0
for key in label_set:
if key not in label_dict.keys(): #label_dict.keys()中keys()表示字典裡邊的鍵值對,意思為如果key沒有在字典中出現,那麼記錄此key的值為i,key和keys沒有關係。
label_dict[key ]= i
i = i+1
print(label_dict)
print('999')
data1=data.map(lambda x: ([x[i] for i in range(10)],label_dict[x[10]])).\ #第一個map將資料集對映為兩個部分,分為了x和y兩部分
map(lambda (x,y): [float(x[0]), float(x[1]),float(x[2]),float(x[3]),float(x[4]),float(x[5]), #第二個map將上一個map中的x轉換為數值型別,y不動
float(x[6]),float(x[7]),int(float(x[8])),int(float(x[9])),y])
print('1010')
data2 = sqlContext.createDataFrame(data1,['a',"b","c","d","e","f","g","h","i","j","id"])
print('111111')
data2.show(5, False)
s=data2.printSchema
print(s)
print('8595')
data3 = data2.map(lambda x:(Vectors.dense([x[i] for i in range(0,10)]),float(x[10]))) #將所有自變數組合成一個特徵向量,以符合演算法需要的資料型別
print(data3)
feature_data = sqlContext.createDataFrame(data3, ['features', 'id'])
print('7878')
train_data, test_data = feature_data.randomSplit([0.7, 0.3], 6) #分割資料集
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures',withStd=True, withMean=False)
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
lr = LogisticRegression(maxIter=10, featuresCol='pcaFeatures', labelCol='id')
print('6565')
pipeline = Pipeline(stages=[scaler, pca, lr])
Model = pipeline.fit(train_data)
results = Model.transform(test_data)
results.show()
最後評估模型,構建混淆矩陣
from pyspark.mllib.evaluation import MulticlassMetrics
predictionAndLabels = results.select('probability', 'prediction', 'prediction').map(lambda x: (x[1], x[2]))
metrics = MulticlassMetrics(predictionAndLabels)
table=metrics.confusionMatrix().toArray()
print(table)
標註:程式碼為複製人的程式碼,換了自己的資料。