如何基於spark做深度學習:從ML到keras、Elephas
http://blog.csdn.net/Richard_More/article/details/53215142
Elephas的網址:https://github.com/maxpumperla/elephas分散式深層神經網路的Spark ML模型管線
該筆記本描述瞭如何使用Spark ML為分散式版本的Keras深度學習模型構建機器學習流水線。作為資料集,我們使用來自Kaggle的Otto產品分類挑戰。我們選擇這個資料的原因是它很小,結構非常好。這樣,我們可以更多地關注技術元件,而不是進行復雜的處理。此外,具有較慢硬體或沒有完整的Spark群集的使用者應該能夠在本地執行此示例,並且仍然會了解有關分散式模式的許多內容。
通常,模型訓練不需要分配計算,而是建立資料流水線,即攝入,轉換等。在訓練中,深層神經網路往往在一臺機器上的一個或多個GPU上做得相當好。大多數情況下,使用梯度下降方法,您將一個接一個地處理。即使如此,使用像Spark這樣的框架也可能有益於將您的模型與您的周邊基礎架構相整合。除此之外,Spark ML管道提供的便利性非常有價值(在語法上非常接近你所知道的scikit-learn
)。
TL; DR:我們將展示如何使用分散式深層神經網路和Spark ML管道來解決分類問題,這個例子基本上是這裡發現的分散式版本。
使用這個筆記本
當我們要使用elephas時,您將需要訪問正在執行的Spark上下文才能執行此筆記本。SPARK_HOME
到您的路徑並啟動您的ipython
/ jupyter筆記本如下:
IPYTHON_OPTS="notebook" ${SPARK_HOME}/bin/pyspark --driver-memory 4G elephas/examples/Spark_ML_Pipeline.ipynb
- 1
- 1
要測試您的環境,請嘗試列印Spark上下文(提供sc
),即執行以下單元格。
from __future__ import print_function
print(sc)
- 1
- 2
- 1
- 2
<pyspark.context.SparkContext object at 0x1132d61d0>
奧托產品分類資料
培訓和測試資料在這裡可用。繼續下載資料。檢查它,您將看到提供的csv檔案包含一個id列,93個整數特徵列。train.csv
有一個額外的標籤欄,test.csv
缺少。挑戰是準確預測測試標籤。對於本筆記本的其餘部分,我們將假設儲存資料data_path
,您應根據需要修改下面的資料。
data_path = "./" # <-- Make sure to adapt this to where your csv files are.
- 1
- 1
載入資料比較簡單,但是我們要照顧幾件事情。首先,雖然你可以洗牌RDD,但通常不是很有效率。但是由於資料train.csv
按類別排序,所以我們必須洗牌才能使模型執行良好。這是shuffle_csv
下面的功能。接下來,我們用明文讀入load_data_rdd
,以逗號分割,並將要素轉換為浮點型向量。另外請注意,最後一列train.csv
表示具有Class_
字首的類別。
定義資料幀
Spark有一些核心的資料結構,其中包括data frame
,這是一個分散式版本的命名列資料結構,現在很多都是來自R或熊貓。我們需要一個所謂的SQLContext
和可選的列到名稱對映來建立從頭開始的資料框架。
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors
import numpy as np
import random
sql_context = SQLContext(sc)
def shuffle_csv(csv_file):
lines = open(csv_file).readlines()
random.shuffle(lines)
open(csv_file, 'w').writelines(lines)
def load_data_frame(csv_file, shuffle=True, train=True):
if shuffle:
shuffle_csv(csv_file)
data = sc.textFile(data_path + csv_file) # This is an RDD, which will later be transformed to a data frame
data = data.filter(lambda x:x.split(',')[0] != 'id').map(lambda line: line.split(','))
if train:
data = data.map(
lambda line: (Vectors.dense(np.asarray(line[1:-1]).astype(np.float32)),
str(line[-1])) )
else:
# Test data gets dummy labels. We need the same structure as in Train data
data = data.map( lambda line: (Vectors.dense(np.asarray(line[1:]).astype(np.float32)),"Class_1") )
return sqlContext.createDataFrame(data, ['features', 'category'])
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
我們載入訓練和測試資料,並使用方便的show
方法列印幾行資料。
train_df = load_data_frame("train.csv")
test_df = load_data_frame("test.csv", shuffle=False, train=False) # No need to shuffle test data
print("Train data frame:")
train_df.show(10)
print("Test data frame (note the dummy category):")
test_df.show(10)
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
Train data frame:
+--------------------+--------+
| features|category|
+--------------------+--------+
|[0.0,0.0,0.0,0.0,...| Class_8|
|[0.0,0.0,0.0,0.0,...| Class_8|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,1.0,0.0,1.0,...| Class_6|
|[0.0,0.0,0.0,0.0,...| Class_9|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_3|
|[0.0,0.0,4.0,0.0,...| Class_8|
|[0.0,0.0,0.0,0.0,...| Class_7|
+--------------------+--------+
only showing top 10 rows
Test data frame (note the dummy category):
+--------------------+--------+
| features|category|
+--------------------+--------+
|[1.0,0.0,0.0,1.0,...| Class_1|
|[0.0,1.0,13.0,1.0...| Class_1|
|[0.0,0.0,1.0,1.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[2.0,0.0,5.0,1.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,1.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
+--------------------+--------+
only showing top 10 rows
預處理:定義變壓器
到目前為止,我們基本上只讀原始資料。幸運的是,Spark ML
有很多預處理功能可用,所以我們唯一要做的就是定義資料幀的轉換。
要繼續,我們將首先將類別字串轉換為雙精度值。這是由一個所謂的StringIndexer
。請注意,我們已經在這裡進行了實際的轉型,但這只是為了演示的目的。我們真正需要的是太多的定義,string_indexer
以便稍後再進行管理。
from pyspark.ml.feature import StringIndexer
string_indexer = StringIndexer(inputCol="category", outputCol="index_category")
fitted_indexer = string_indexer.fit(train_df)
indexed_df = fitted_indexer.transform(train_df)
- 1
- 2
- 3
- 4
- 五
- 1
- 2
- 3
- 4
- 五
接下來,將功能規範化,這是一個很好的做法StandardScaler
。
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
fitted_scaler = scaler.fit(indexed_df)
scaled_df = fitted_scaler.transform(indexed_df)
- 1
- 2
- 3
- 4
- 五
- 1
- 2
- 3
- 4
- 五
print("The result of indexing and scaling. Each transformation adds new columns to the data frame:")
scaled_df.show(10)
- 1
- 2
- 1
- 2
The result of indexing and scaling. Each transformation adds new columns to the data frame:
+--------------------+--------+--------------+--------------------+
| features|category|index_category| scaled_features|
+--------------------+--------+--------------+--------------------+
|[0.0,0.0,0.0,0.0,...| Class_8| 2.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_8| 2.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2| 0.0|[-0.2535060296260...|
|[0.0,1.0,0.0,1.0,...| Class_6| 1.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_9| 4.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2| 0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2| 0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_3| 3.0|[-0.2535060296260...|
|[0.0,0.0,4.0,0.0,...| Class_8| 2.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_7| 5.0|[-0.2535060296260...|
+--------------------+--------+--------------+--------------------+
only showing top 10 rows
Keras深度學習模式
現在我們有一個具有處理特徵和標籤的資料框架,我們定義一個深層神經網路,我們可以使用它來解決分類問題。你有機會來這裡,因為你知道一兩件關於深入學習的東西。如果是這樣,下面的模型看起來很簡單。我們通過選擇一組三個連續的密集層來建立一個keras模型,其中包含退出和ReLU啟用。對於這個問題肯定有更好的架構,但是我們只是想在這裡展示一般的流程。
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.utils import np_utils, generic_utils
nb_classes = train_df.select("category").distinct().count()
input_dim = len(train_df.select("features").first()[0])
model = Sequential()
model.add(Dense(512, input_shape=(input_dim,)))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
分散式Elephas模型
為了將上述Keras提升model
到Spark,我們定義了一個Estimator
。一個Estimator
是,仍然有被訓練模型的星火的化身。它本質上只有一個(必需)的方法,即fit
。一旦我們呼叫fit
了資料框架,我們就回到了一個Model
,這是一個訓練有素的模型,transform
用來預測標籤的方法。
我們通過初始化ElephasEstimator
和設定一些屬性來實現。到目前為止,我們的輸入資料框架將有很多列,我們必須通過列名告訴模型在哪裡查詢功能和標籤。然後我們提供Keras模型和Elephas優化器的序列化版本。我們不能直接插入keras模型Estimator
,因為Spark將不得不將其序列化為與工作人員的溝通,所以最好自己提供序列化。事實上,雖然pyspark知道如何序列化model
,但它是非常低效的,如果模型變得太大,可能會破裂。Spark
ML對引數特別挑剔(正確地),或多或少地禁止您提供後者的非原子型別和陣列。大多數剩餘的引數是可選的,而且是自我解釋的。加,許多人,你知道如果你以前曾經執行過克拉斯模型。我們只是將他們包括在內,以顯示全套培訓配置。
from elephas.ml_model import ElephasEstimator
from elephas import optimizers as elephas_optimizers
# Define elephas optimizer (which tells the model how to aggregate updates on the Spark master)
adadelta = elephas_optimizers.Adadelta()
# Initialize SparkML Estimator and set all relevant properties
estimator = ElephasEstimator()
estimator.setFeaturesCol("scaled_features") # These two come directly from pyspark,
estimator.setLabelCol("index_category") # hence the camel case. Sorry :)
estimator.set_keras_model_config(model.to_yaml()) # Provide serialized Keras model
estimator.set_optimizer_config(adadelta.get_config()) # Provide serialized Elephas optimizer
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1) # We just use one worker here. Feel free to adapt it.
estimator.set_nb_epoch(20)
estimator.set_batch_size(128)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
ElephasEstimator_415398ab22cb1699f794
SparkML管道
現在的簡單部分:定義管道真的像列出流水線階段一樣簡單。我們可以提供Transformers
和Estimators
真正的任何配置,但是這裡我們只需要先前定義的三個元件。請注意,string_indexer
並scaler
和互換,而estimator
有些明明已經來到最後的管道。
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[string_indexer, scaler, estimator])
- 1
- 2
- 3
- 1
- 2
- 3
安裝和評估管道
現在的最後一步是適應管道的培訓資料和評估。我們對訓練資料進行評估,即轉換,因為只有在這種情況下,我們有標籤來檢查模型的準確性。如果你喜歡,你也可以改造test_df
。
from pyspark.mllib.evaluation import MulticlassMetrics
fitted_pipeline = pipeline.fit(train_df) # Fit model to data
prediction = fitted_pipeline.transform(train_df) # Evaluate on train data.
# prediction = fitted_pipeline.transform(test_df) # <-- The same code evaluates test data.
pnl = prediction.select("index_category", "prediction")
pnl.show(100)
prediction_and_label = pnl.map(lambda row: (row.index_category, row.prediction))
metrics = MulticlassMetrics(prediction_and_label)
print(metrics.precision())
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 1
- 2
- 3
- 4
- 五
- 6
- 7
- 8
- 9
- 10
- 11
- 12
61878/61878 [==============================] - 0s
+--------------+----------+
|index_category|prediction|
+--------------+----------+
| 2.0| 2.0|
| 2.0| 2.0|
| 0.0| 0.0|
| 1.0| 1.0|
| 4.0| 4.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 3.0| 3.0|
| 2.0| 2.0|
| 5.0| 0.0|
| 0.0| 0.0|
| 4.0| 4.0|
| 0.0| 0.0|
| 4.0| 1.0|
| 2.0| 2.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 6.0| 0.0|
| 2.0| 2.0|
| 1.0| 1.0|
| 2.0| 2.0|
| 8.0| 8.0|
| 1.0| 1.0|
| 5.0| 0.0|
| 0.0| 0.0|
| 0.0| 3.0|
| 0.0| 0.0|
| 1.0| 1.0|
| 4.0| 4.0|
| 2.0| 2.0|
| 0.0| 3.0|
| 3.0| 3.0|
| 0.0| 0.0|
| 3.0| 0.0|
| 1.0| 5.0|
| 3.0| 3.0|
| 2.0| 2.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 2.0| 2.0|
| 2.0| 2.0|
| 1.0| 1.0|
| 6.0| 6.0|
| 1.0| 1.0|
| 0.0| 3.0|
| 7.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 6.0| 6.0|
| 0.0| 0.0|
| 0.0| 3.0|
| 2.0| 2.0|
| 0.0| 0.0|
| 2.0| 2.0|
| 0.0| 0.0|
| 4.0| 4.0|
| 0.0| 0.0|
| 6.0| 6.0|
| 2.0| 5.0|
| 0.0| 3.0|
| 3.0| 0.0|
| 0.0| 0.0|
| 3.0| 3.0|
| 4.0| 4.0|
| 0.0| 3.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 4.0| 4.0|
| 3.0| 0.0|
| 2.0| 2.0|
| 1.0| 1.0|
| 7.0| 7.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 3.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 5.0| 4.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 4.0| 4.0|
| 3.0| 3.0|
| 0.0| 0.0|
| 2.0| 2.0|
| 4.0| 4.0|
| 7.0| 7.0|
| 2.0| 2.0|
| 0.0| 0.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 4.0| 4.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 3.0|
| 0.0| 3.0|
| 0.0| 0.0|
+--------------+----------+
only showing top 100 rows
0.764132648114
結論
當然,需要一些時間掌握Keras和Spark的原理和語法,這取決於您來自哪裡。然而,我們也希望您得出結論,一旦您超越了定義模型和預處理資料的艱鉅階段,構建和使用SparkML流水線的業務是非常優雅和有用的。
如果您喜歡您所看到的,請考慮進一步改善對Ceras或Spark的影響。你對這款膝上型電腦有什麼建設性的意見嗎?有什麼要我澄清嗎?無論如何,請隨時與我聯絡。