Spark之MLlib
目錄
- Part VI. Advanced Analytics and Machine Learning
- Advanced Analytics and Machine Learning Overview
- 1.A Short Primer on Advanced Analytics
- 2.Spark’s Advanced Analytics Toolkit
- 3.ML in Action
- 4.部署模式
- Preprocessing and Feature Engineering
- 1.Formatting Models According to Your Use Case
- 2.連續型feature的轉換
- 3.Categorical Features
- 4.Text Data Transformers
- 5.Feature Manipulation and Selection
- 6.Advanced Topics
- 7.例子linkage(C2)
- Classification
- 例子Predicting Forest Cover (C4)
- Regression
- Recommendation
- 1.Collaborative Filtering with Alternating Least Squares
- 例子Recommending Music(C3)
- 1.text文件的切割
- 2.利用map進行數據轉換
- 3.ALS模型的實現以及推斷用戶偏好
- 4.ALS模型的評估
- 5.調參
- Unsupervised Learning
- Anomaly Detection in Network Traffic (C5)
- MLlib(了解)
- feature encoding 和 data preparation
- Advanced Analytics and Machine Learning Overview
Part VI. Advanced Analytics and Machine Learning
Advanced Analytics and Machine Learning Overview
1.A Short Primer on Advanced Analytics
目的:deriving insights and making predictions or recommendations
概念
Supervised Learning:用含有label(因變量)的歷史數據訓練模型來預測新數據的label。訓練過程通常用GD來不斷調整參數以完善模型。
Classification:預測一個categorical 變量,即一個離散的,有限的value集合。結果只有一個值時,根據categorical變量的可取值的數量分為binary和multiclass。結果有多個值為Multilabel Classification
Regression:預測一個連續變量,一個實數。
Recommendation:基於相似客戶的喜好或相似商品來推薦
Unsupervised Learning:從數據中尋找規律,沒有label。
Graph Analytics:研究vertices (objects) 和edges (relationships between those objects)組成的結構
The Advanced Analytics Process
- 收集相關數據
- Cleaning and inspecting the data to better understand it.
- 特征工程
- 訓練模型
- 比較和評估模型
- 利用模型的結果或模型本身來解決問題
2.Spark’s Advanced Analytics Toolkit
介紹
提供接口完成上述Advanced Analytics Process的模塊。和其他ML庫相比,Spark的更適合數據量大時使用。
ml庫提供DF接口。本書只介紹它。
mllib庫是底層APIs,現在是維護模式,只會修復bug,不會添加新feature。目前如果想進行streaming training,只能用millib。
概念
Transformers:轉換數據的函數,DF => 新DF
Estimators: 包含fit和transform的類,根據功能可分為用於初始化數據的transformer和訓練算法。
Low-level data types:Vector
(類似numpy)包含doubles類型,可以sparse(大部分為0)或dense(很多不同值)
//創建vector
val denseVec = Vectors.dense(1.0, 2.0, 3.0)
val size = 3
val idx = Array(1,2) // locations of non-zero elements in vector
val values = Array(2.0,3.0)
val sparseVec = Vectors.sparse(size, idx, values)
sparseVec.toDense
denseVec.toSparse
//兩個矩陣相同
Matrices.dense(3,3,Array(1,0,0,0,1,0,0,0,1))
Matrices.sparse(3,3,Array(0,1,2,3),Array(0,1,2),Array(1,1,1))//第一個array中,0是起始elem個數,1代表第一列有一個elem,2表示到了第二列一共有兩個,如此類推。
//分布式矩陣
RowMatrix
//運算
breeze.linalg.DenseVector(1,2,3)
breeze.linalg.DenseMatrix(Array(1,2,3), Array(4,5,6))
3.ML in Action
RFormula:
import org.apache.spark.ml.feature.RFormula
“~”分開target和terms;“+0”和“-1”一樣,去掉intercept;“: ”數值乘法或二進制分類值; “.”除target的所有列
//加載數據
var df = spark.read.json("/data/simple-ml")
df.orderBy("value2").show()
//轉化數據
//進入訓練算法的數據只能是Double(for labels)或Vectro[Double](for features)
val supervised = new RFormula()
.setFormula("lab ~ . + color:value1 + color:value2")
val fittedRF = supervised.fit(df)
val preparedDF = fittedRF.transform(df)
preparedDF.show()//會在原DF表後添加features和label列,Spark的ml算法的默認列名
//劃分訓練集和測試集
val Array(train, test) = preparedDF.randomSplit(Array(0.7, 0.3))
//訓練和查看模型預測結果(對訓練集的)
val lr = new LogisticRegression().setLabelCol("label").setFeaturesCol("features")
println(lr.explainParams())
val fittedLR = lr.fit(train)
fittedLR.transform(train).select("label", "prediction").show()
//建立pipeline
//transformers or models實例是不能在不同pipelines中重復使用的,所以上面的要重新new
val rForm = new RFormula()
val lr = new LogisticRegression().setLabelCol("label").setFeaturesCol("features")
val stages = Array(rForm, lr)
val pipeline = new Pipeline().setStages(stages)//pipeline.stages(0).asInstanceOf[RFormula]可得到該stage的對象。通常是最後取model或stringIndexerModel的inverse
//建立超參網格
val params = new ParamGridBuilder()
.addGrid(rForm.formula, Array(
"lab ~ . + color:value1",
"lab ~ . + color:value1 + color:value2"))
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.addGrid(lr.regParam, Array(0.1, 2.0))
.build()
//建立評估器
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setRawPredictionCol("prediction")
.setLabelCol("label")
//交叉驗證
val tvs = new TrainValidationSplit()
.setTrainRatio(0.75) // also the default.
.setEstimatorParamMaps(params)
.setEstimator(pipeline)
.setEvaluator(evaluator)
val tvsFitted = tvs.fit(train)
//轉換測試集並評估areaUnderROC
evaluator.evaluate(tvsFitted.transform(test))
//可查看模型訓練歷史
val trainedPipeline = tvsFitted.bestModel.asInstanceOf[PipelineModel]
val TrainedLR = trainedPipeline.stages(1).asInstanceOf[LogisticRegressionModel]
val summaryLR = TrainedLR.summary
summaryLR.objectiveHistory
//提取最優模型的參數
.stages.last.extractParamMap
//每個模型的得分和參數
val paramsAndMetrics = tvsFitted.validationMetrics.
zip(tvsFitted.getEstimatorParamMaps).sortBy(-_._1)
paramsAndMetrics.foreach { case (metric, params) =>
println(metric)
println(params)
println()
}
//儲存模型
tvsFitted.write.overwrite().save("path")
//加載模型,根據模型版本來load,這裏load的是CrossValidator得到的模型,所以用CrossValidatorModel(這裏TrainValidationSplitModel)。之前手動的LogisticRegression則用LogisticRegressionModel
val model = TrainValidationSplitModel.load("path")
model.transform(test)
//如果想輸出PMML格式,可以參考MLeap的github
4.部署模式
- 離線訓練,用於分析(適合Spark)
- 離線訓練,儲存結果到數據庫中,適合recommendation
- 離線訓練,儲存模型用於服務。(並非低延遲,啟動Spark消耗大)
- 將分布式模型轉化為運行得更快的單機模式。(Spark可導出PMML)
- 線上訓練和使用。(結合Structured Streaming,適合部分ML模型)
Preprocessing and Feature Engineering
1.Formatting Models According to Your Use Case
- 大部分 classification and regression:label和feature
- recommendation:users, items和ratings
- unsupervised learning:features
- graph analytics:vertices DF 和edges DF
//4個樣本數據
val sales = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/by-day/*.csv")
.coalesce(5)
.where("Description IS NOT NULL")//Spark對null的處理還在改進
val fakeIntDF = spark.read.parquet("/data/simple-ml-integers")
var simpleDF = spark.read.json("/data/simple-ml")
val scaleDF = spark.read.parquet("/data/simple-ml-scaling")
sales.cache()
//Transformers,轉換Description
val tkn = new Tokenizer().setInputCol("Description")
tkn.transform(sales.select("Description")).show(false)
//Estimators
val ss = new StandardScaler().setInputCol("features")
ss.fit(scaleDF).transform(scaleDF).show(false)
//High-Level Transformers
//RFormula:string默認one-hot(label為Double),numeric默認Double
val supervised = new RFormula()
.setFormula("lab ~ . + color:value1 + color:value2")
supervised.fit(simpleDF).transform(simpleDF).show()
//SQL Transformers
val basicTransformation = new SQLTransformer()
.setStatement("""
SELECT sum(Quantity), count(*), CustomerID
FROM __THIS__
GROUP BY CustomerID
""")
basicTransformation.transform(sales).show()
//VectorAssembler,將所有feature合並為一個大的vector,通常是pipeline的最後一步
val va = new VectorAssembler().setInputCols(Array("int1", "int2", "int3"))
va.transform(fakeIntDF).show()
2.連續型feature的轉換
只適用於Double
val contDF = spark.range(20).selectExpr("cast(id as double)")
// bucketing,參數為最小值,一個中間值,最大值(即一共最分為2組)。由於已劃分,所以不需fit
//下面分組是[-1.0,5.0),[5.0, 10.0),[10.0,250.0)...
val bucketBorders = Array(-1.0, 5.0, 10.0, 250.0, 600.0)//最值也可選擇scala.Double.NegativeInfinity/ PositiveInfinity
val bucketer = new Bucketizer().setSplits(bucketBorders).setInputCol("id")
//對於null or NaN 值,需要指定.handleInvalid 參數為某個值,或者keep those values, error or null, or skip those rows
//QuantileDiscretizer
val bucketer = new QuantileDiscretizer().setNumBuckets(5).setInputCol("id")
//Scaling and Normalization
//StandardScaler,.withMean(默認false),對於sparse數據消耗大
val sScaler = new StandardScaler().setInputCol("features")
//MinMaxScaler
val minMax = new MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
//MaxAbsScaler,between ?1 and 1
val maScaler = new MaxAbsScaler().setInputCol("features")
//ElementwiseProduct,不需fit
val scaleUpVec = Vectors.dense(10.0, 15.0, 20.0)
val scalingUp = new ElementwiseProduct()
.setScalingVec(scaleUpVec)
.setInputCol("features")//如果feature的某row是[1, 0.1, -1],轉換後變為[10, 1.5, -20]
//Normalizer,1,2,3等
val manhattanDistance = new Normalizer().setP(1).setInputCol("features")
還有一些高級的bucketing,如 locality sensitivity hashing (LSH) 等
3.Categorical Features
recommend re-indexing every categorical variable when pre-processing just for consistency’s sake. 所以下面的transform和fit都是傳入整個DF,特別提示除外。
//StringIndexer,一種string to 一個int值。也可對非string使用,但會先轉換為string再轉為int
val lblIndxr = new StringIndexer().setInputCol("lab").setOutputCol("labelInd")
//如果fit後的transform對象有未見過的,會error,或者通過下面代碼設置skip整個row
valIndexer.setHandleInvalid("skip")
valIndexer.fit(simpleDF).setHandleInvalid("skip")
//IndexToString,例如將classification的結果轉換回string。由於Spark保留了元數據,所以不需要fit。可能有些沒保留,就多加一步.setLabels(Model.labels)
val labelReverse = new IndexToString().setInputCol("labelInd")
//VectorIndexer,設定最大分類量。下面transformer對於unique值多於兩個的,不會進行分類。
val indxr = new VectorIndexer().setInputCol("features").setOutputCol("idxed")
.setMaxCategories(2)
//OneHotEncoder
val lblIndxr = new StringIndexer().setInputCol("color").setOutputCol("colorInd")
val colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
val ohe = new OneHotEncoder().setInputCol("colorInd")
ohe.transform(colorLab).show()
//普通encoder,用when,otherwise
df.select(when(col("happy") === true, 1).otherwise(2).as("encoded")).show
//mapEncoder,下面函數其實與null無關,也可查找“利用map進行數據轉換”,沒有otherwise可用。
df.na.replace("Description", Map("" -> "UNKNOWN"))
4.Text Data Transformers
兩類:string categorical variables(前面提到的)和free-form text
//不需fit
//Tokenizer,space分隔
val tkn = new Tokenizer().setInputCol("Description").setOutputCol("DescOut")
val tokenized = tkn.transform(sales.select("Description"))
tokenized.show(false)
//RegexTokenizer,pattern分隔
val rt = new RegexTokenizer()
.setInputCol("Description")
.setOutputCol("DescOut")
.setGaps(false)//設置false就提取pattern
.setPattern(" ") // simplest expression
.setToLowercase(true)
//StopWordsRemover
val englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
val stops = new StopWordsRemover()
.setStopWords(englishStopWords)
.setInputCol("DescOut")
//NGram,下面代碼將[Big, Data, Processing, Made]變為[Big Data, Data Processing, Processing Made]
val bigram = new NGram().setInputCol("DescOut").setN(2)
//詞頻CountVectorizer
val cv = new CountVectorizer()
.setInputCol("DescOut")
.setOutputCol("countVec")
.setVocabSize(500)
.setMinTF(1)//所transform的文本裏出現的最低頻率
.setMinDF(2)//收入字典的最低頻率
//下面結果,後邊第二項是單詞在字典中的位置(非對應),第三項為在此row的頻率
|[rabbit, night, light] |(500,[150,185,212],[1.0,1.0,1.0]) |
//反詞頻HashingTF,出現越少評分越高。不同於上面CountVectorizer,知道index不能返回詞
val tf = new HashingTF()
.setInputCol("DescOut")
.setOutputCol("TFOut")
.setNumFeatures(10000)
val idf = new IDF()
.setInputCol("TFOut")
.setOutputCol("IDFOut")
.setMinDocFreq(2)
idf.fit(tf.transform(tfIdfIn)).transform(tf.transform(tfIdfIn)).show(false)
//下面結果,第二項為哈希值,第三項為評分
(10000,[2591,4291,4456],[1.0116009116784799,0.0,0.0])
//Word2Vec(深度學習部分,暫略)
5.Feature Manipulation and Selection
//PCA
val pca = new PCA().setInputCol("features").setK(2)
pca.fit(scaleDF).transform(scaleDF).show(false)
//Interaction,用RFormula
//Polynomial Expansion
val pe = new PolynomialExpansion().setInputCol("features").setDegree(2)
//ChiSqSelector
val chisq = new ChiSqSelector()
.setFeaturesCol("countVec")
.setLabelCol("CustomerId")
.setNumTopFeatures(2)//百分比
6.Advanced Topics
//Persisting Transformers
val fittedPCA = pca.fit(scaleDF)
fittedPCA.write.overwrite().save("/tmp/fittedPCA")
val loadedPCA = PCAModel.load("/tmp/fittedPCA")
loadedPCA.transform(scaleDF).show()
//自定義轉換
class MyTokenizer(override val uid: String)
extends UnaryTransformer[String, Seq[String],
MyTokenizer] with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("myTokenizer"))
val maxWords: IntParam = new IntParam(this, "maxWords",
"The max number of words to return.",
ParamValidators.gtEq(0))
def setMaxWords(value: Int): this.type = set(maxWords, value)
def getMaxWords: Integer = $(maxWords)
override protected def createTransformFunc: String => Seq[String] = (
inputString: String) => {
inputString.split("\\s").take($(maxWords))
}
override protected def validateInputType(inputType: DataType): Unit = {
require(
inputType == StringType, s"Bad input type: $inputType. Requires String.")
}
override protected def outputDataType: DataType = new ArrayType(StringType,
true)
}
// this will allow you to read it back in by using this object.
object MyTokenizer extends DefaultParamsReadable[MyTokenizer]
val myT = new MyTokenizer().setInputCol("someCol").setMaxWords(2)
myT.transform(Seq("hello world. This text won‘t show.").toDF("someCol")).show()
//另外一個自定義轉換
class ConfigurableWordCount(override val uid: String) extends Transformer {
final val inputCol= new Param[String](this, "inputCol", "The input column")
final val outputCol = new Param[String](this, "outputCol", "The output column")
def setInputCol(value: String): this.type = set(inputCol, value)
def setOutputCol(value: String): this.type = set(outputCol, value)
//構造器
def this() = this(Identifiable.randomUID("configurablewordcount"))
//current stage的copy,一般用defaultCopy就可以了
def copy(extra: ParamMap): HardCodedWordCountStage = {
defaultCopy(extra)
}
//修改返回的schema: StructType。記得先檢查輸入類型
override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(
s"Input type ${field.dataType} did not match input type StringType")
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}
def transform(df: Dataset[_]): DataFrame = {
val wordcount = udf { in: String => in.split(" ").size }
df.select(col("*"), wordcount(df.col($(inputCol))).as($(outputCol)))
}
}
關於estimator和model的自定義,參考《high performance spark》的CustomPipeline.scala。可以加些caching代碼。另外org.apache.spark.ml.Predictor 和 org.apache.spark.ml.classificationClassifier 有時更方便,因為它們能自動處理schema transformation。後者多了rawPredictionColumn和getNumClasses。而回歸跟聚類就只能用estimator接口了。
7.例子linkage(C2)
1.transpose summary table
//parsed為某DF
val summary = parsed.describe()//summary表第一行為列名,第一列為metric名
val schema = summary.schema//StructType(StructField(summary,StringType,true)...)全部都是StringType
val longDF = summary.flatMap(row => {
val metric = row.getString(0)
(1 until row.size).map(i => {
(metric, schema(i).name, row.getString(i).toDouble)
})
}).toDF("metric", "field", "value")
//前5行結果為,下面還有其他metric
+------+------------+---------+
|metric| field| value|
+------+------------+---------+
| count| id_1|5749132.0|
| count| id_2|5749132.0|
| count|cmp_fname_c1|5748125.0|
| count|cmp_fname_c2| 103698.0|
| count|cmp_lname_c1|5749132.0|
+------+------------+---------+
//進行透視
val wideDF = longDF
.groupBy("field")
.pivot("metric")//2.2以下要加,Seq("count", "mean", "stddev", "min", "max")?
.sum()//每對組合都是唯一的,所以sum()其實只有一個數
//可以將上面兩步封裝為一個function。打開IDEA(下面代碼忽略了import DF,sql.functions之類),寫一個Pivot.scala,然後load這個類,就可以對任何.describe()表使用了。
//要對上面的summary改為desc
def pivotSummary(desc: DataFrame): DataFrame = {
val schema = desc.schema
import desc.sparkSession.implicits._
//查看結果
wideDF.select("field", "count", "mean").show()
上面練習中,row.getString(i)得到的是java.lang.String類,其本身沒有toDouble方法,是通過Scala的隱式轉換實現的。這種轉換把String變為了StringOps(Scala類),然後調用該類的toDouble。隱式轉換讓我們增加核心類的功能,但有時讓人難以弄清功能的來源。
2.feature 選擇
其實簡單join用SQL可能更清晰,下面有點附加功能,當作閱讀理解。它主要目的是比較兩個.describe()表(一個match表,一個miss表)的差異
matchSummaryT.createOrReplaceTempView("match_desc")
missSummaryT.createOrReplaceTempView("miss_desc")
spark.sql("""
SELECT a.field, a.count + b.count total, a.mean - b.mean delta
FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
WHERE a.field NOT IN ("id_1", "id_2")
ORDER BY delta DESC, total DESC
""").show()
//結果前兩row,field中每項的值域為0~1
+------------+---------+--------------------+
| field| total| delta|
+------------+---------+--------------------+
| cmp_plz|5736289.0| 0.9563812499852176|
|cmp_lname_c2| 2464.0| 0.8064147192926264|
+------------+---------+--------------------+
//上表中,total看數據缺失情況,delta看差異情況
3.交叉表分析
例子數據比較簡單,所以書中方法只是為了展示,最後有簡便的方法實現。
創建case class並把DF[row] -> Dataset[MatchData],這樣對結構化表格中的元素的操作更靈活,但會放棄DF的部分效率。
//1.創建轉化需要的類
case class MatchData(//下面刪去了部分變量
id_1: Int,
cmp_fname_c1: Option[Double],
cmp_plz: Option[Int],
is_match: Boolean
)
val matchData = parsed.as[MatchData]
//2.創建case class擁有+方法
case class Score(value: Double) {
def +(oi: Option[Int]) = {
Score(value + oi.getOrElse(0))
}
}
//3.把認為合適的feature加總(分析來自Spark Join1.實踐),得出評分
def scoreMatchData(md: MatchData): Double = {
(Score(md.cmp_lname_c1.getOrElse(0.0)) + md.cmp_plz +
md.cmp_by + md.cmp_bd + md.cmp_bm).value
}
//4.把評分和label抽出來
val scored = matchData.map {
md => (scoreMatchData(md), md.is_match)
}.toDF("score", "is_match")
//5.創建crosstab
def crossTabs(scored: DataFrame, t: Double): DataFrame = {
scored.selectExpr(s"score >= $t as above", "is_match")
.groupBy("above")
.pivot("is_match")
.count()
}
crossTabs(scored, 2.0).show()
+-----+-----+-------+
|above| true| false|
+-----+-----+-------+
| true|20931| 596414|
|false| null|5131787|
+-----+-----+-------+
//上面可以直接在DF實現,而不需要轉為Dataset。
val scored = parsed
.na.fill(0, Seq("cmp_lname_c1",...))//如果不fill,下面列中有null行的結果為null
.withColumn("score", expr("cmp_lname_c1 + ..."))
.select("score", "is_match")
Classification
1.四個最常用模型
模型Scalability
Model | Features count | Training examples | Output classes |
---|---|---|---|
Logistic regression | 1 to 10 million | No limit | Features x Classes < 10 million |
Decision trees | 1,000s | No limit | Features x Classes < 10,000s |
Random forest | 10,000s | No limit | Features x Classes < 100,000s |
Gradient-boosted trees | 1,000s | No limit | Features x Classes < 10,000s |
模型參數
Logistic regression | Decision trees | Random forest | GDBT(目前只能binary) |
---|---|---|---|
family: multinomial or binary | maxDepth: 默認5 | numTrees | lossType: 只支持 logistic loss |
elasticNetParam: 0~1, 0為純L2,1為純L1 | maxBins: 對某feature的分類數,默認32 | featureSubsetStrategy特征考慮數: auto, all, sqrt, log2等 | maxIter: 100 |
fitIntercept: boolean 如果沒有normalized通常會設 | impurity: “entropy” or “gini” (default) | stepSize: 0~1,默認0.1 | |
regParam: >=0 | minInfoGain: 最小 information gain,默認0 | ||
standardization: boolean | minInstancePerNode:默認1 | ||
maxIter: 默認100,不應該第一個調 | checkpointInterval: -1取消,10表示每10次叠代記錄一次。還要設置 checkpointDir和useNodeIdCache=true |
checkpointInterval | checkpointInterval |
tol: 默認1.0E-6,不應該第一個調 | |||
weightCol | |||
threshold: 0~1,用於懲罰錯誤 | |||
thresholds: 同上,但適用於 multiclass | thresholds | thresholds | thresholds |
//一些補充,下面主要是Logistic regression
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3)
.setElasticNetParam(0.8)
//查看參數
println(lr.explainParams())
//查看結果,對於multiclass,用lrModel.coefficientMatrix and lrModel.interceptVector
println(lrModel.coefficients)
println(lrModel.intercept)
//查看summary,暫時不適用邏輯回歸的multiclass,其他模型也可以嘗試調用summary後看有什麽方法。不會重新計算
val summary = lrModel.summary
summary.residuals.show()//顯示feature的權重
summary.rootMeanSquaredError
summary.r2
val bSummary = summary.asInstanceOf[BinaryLogisticRegressionSummary]
println(bSummary.areaUnderROC)
bSummary.roc.show()
bSummary.pr.show()
summary.objectiveHistory//看每次叠代的效果
2.其他
Naive Bayes:
indicator variables represent the existence of a term in a document; or the multinomial model, where the total counts of terms are used.
所有input features要非負
一些參數說明:
modelType: “bernoulli” or “multinomial"
weightCol
smoothing: 默認1
thresholds
Evaluators for Classification and Automating Model Tuning
BinaryClassificationEvaluator
: “areaUnderROC” and areaUnderPR"
`MulticlassClassificationEvaluator: “f1”, “weightedPrecision”, “weightedRecall”, and “accuracy”
Detailed Evaluation Metrics
//三種classification相似
val out = model.transform(bInput)
.select("prediction", "label")
.rdd.map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double]))
val metrics = new BinaryClassificationMetrics(out)
metrics.areaUnderPR
metrics.areaUnderROC
println("Receiver Operating Characteristic")
metrics.roc.toDF().show()
One-vs-Rest Classifier
查看Spark documentation,有例子
例子Predicting Forest Cover (C4)
決策樹對異常值(一些極端或錯誤值)很穩健。
樹的建立很耗費內存。
one-hot能使模型對特征逐個考慮(當然更占內存),如果一列categorical特征,模型就可能通過把部分特征分組,而不會深入考慮,但準確率不一定更差。
//創建schema
val colNames = Seq(
"Elevation", "Aspect", "Slope",
...
)++(
(0 until 4).map(i => s"Wilderness_Area_$i")//註意這種創建collection的方式(IndexedSeq)
) ++ Seq("Cover_Type")
val data = dataWithoutHeader.toDF(colNames:_*)//添加schema的方式
.withColumn("Cover_Type", $"Cover_Type".cast("double"))
//生成Vector
val inputCols = trainData.columns.filter(_ != "Cover_Type")
val assembler = new VectorAssembler().
setInputCols(inputCols).
setOutputCol("featureVector")
val assembledTrainData = assembler.transform(trainData)
//模型
//查看樹的邏輯
model.toDebugString
//打印featureImportances
model.featureImportances.toArray.zip(inputCols).
sorted.reverse.foreach(println)
//用DF來計算confusionMatrix,Spark內置的要用rdd
val confusionMatrix = predictions
.groupBy("Cover_Type")
.pivot("prediction", (1 to 7))//1 to 7是對應prediction裏面的內容的。如果prediction裏面沒有7,也可以,但該列全是null
.count()
.na.fill(0.0)
.orderBy("Cover_Type")
//設計一個隨機classifier
def classProbabilities(data: DataFrame): Array[Double] = {
val total = data.count()
data.groupBy("Cover_Type").count()
.orderBy("Cover_Type")
.select("count").as[Double]
.map(_ / total).collect()
}
2.將one-hot轉化為一列類型特征
def unencodeOneHot(data: DataFrame): DataFrame = {
//要轉換的列名
val wildernessCols = (0 until 4).map(i => s"Wilderness_Area_$i").toArray
//將上面的列名上的數值合並為一個vector
val wildernessAssembler = new VectorAssembler()
.setInputCols(wildernessCols)
.setOutputCol("wilderness")
//提取1.0的index來將one-hot轉化categorical number
val unhotUDF = udf((vec: Vector) => vec.toArray.indexOf(1.0).toDouble)
//轉化數據
val withWilderness = wildernessAssembler.transform(data)
.drop(wildernessCols:_*)
.withColumn("wilderness", unhotUDF($"wilderness"))
}
//轉化後的數據在pipeline中添加一步indexer,這樣能使Spark將這些能被劃分的列視為categorical feature。註意這裏假設所處理的數據中4個種類至少出現一次。
val indexer = new VectorIndexer()
.setMaxCategories(4)
.setInputCol("featureVector")
.setOutputCol("indexedVector")
val pipeline = new Pipeline().setStages(Array(assembler, indexer, classifier))
Regression
模型Scalability
Model | Number features | Training examples |
---|---|---|
Linear regression | 1 to 10 million | No limit |
Generalized linear regression | 4,096 | No limit |
Isotonic regression | N/A | Millions |
Decision trees | 1,000s | No limit |
Random forest | 10,000s | No limit |
Gradient-boosted trees | 1,000s | No limit |
Survival regression | 1 to 10 million | No limit |
Generalized Linear Regression
下面Supported links指定線性預測變量與分布函數的均值之間的關系
Family | Response type | Supported links |
---|---|---|
Gaussian | Continuous | Identity*, Log, Inverse |
Binomial | Binary | Logit*, Probit, CLogLog |
Poisson | Count | Log*, Identity, Sqrt |
Gamma | Continuous | Inverse*, Idenity, Log |
Tweedie | Zero-inflated continuous | Power link function |
參數:
family和link參考上表
solver:目前只支持irls(iteratively reweighted least squares)
variancePower:0~1,0默認,1表無窮。表示分布方差和均值的關系,只適用Tweedie
linkPower:Tweedie
linkPredictionCol:boolean
Advanced Methods(略)
Survival Regression (Accelerated Failure Time)
Isotonic Regression:保序回歸,應用於單調遞增的情況
Evaluators and Automating Model Tuning
//Evaluators
val glr = new GeneralizedLinearRegression()
.setFamily("gaussian")
.setLink("identity")
val pipeline = new Pipeline().setStages(Array(glr))
val params = new ParamGridBuilder().addGrid(glr.regParam, Array(0, 0.5, 1))
.build()
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setPredictionCol("prediction")
.setLabelCol("label")
val cv = new CrossValidator()//大量數據時用得不多,太耗時
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(params)
.setNumFolds(2) // should always be 3 or more but this dataset is small
val model = cv.fit(df)
//Metrics
val out = model.transform(df)
.select("prediction", "label")
.rdd.map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double]))
val metrics = new RegressionMetrics(out)
println(s"MSE = ${metrics.meanSquaredError}")
println(s"RMSE = ${metrics.rootMeanSquaredError}")
println(s"R-squared = ${metrics.r2}")
println(s"MAE = ${metrics.meanAbsoluteError}")
println(s"Explained variance = ${metrics.explainedVariance}")
Recommendation
1.Collaborative Filtering with Alternating Least Squares
僅根據用戶過去和商品的interaction情況,而非用戶或商品的attributes,來估計用戶心中的商品排名。需要三列:user ID , item ID, and rating 。其中rating可顯式(user自己的評分)可隱式(user和item的interaction程度)。
該算法會傾向於大眾商品和具有很多說明信息的商品。對於新商品或客戶有cold start問題。
在實際生產當中,一般會預先計算所有用戶的推薦,然後存到NoSQL來實現實時推薦,但這很浪費存儲空間(大部分人在當天不一定需要推薦)。而單獨計算需要幾秒鐘的時間。Oryx 2 可能是一個解決方法。
將一些數值轉化為Integer或者Int更有效率。
參數:
rank:latent factors數量,默認10
alpha:implicit feedback時,被觀察和未被觀察的互動的相對比重,越高說明越看重已記錄的,默認1,40也是個不錯的選擇
regParam:默認0.1
implicitPrefs:boolean,是否implicit,默認true
nonnegative:默認false,non-negative constraints on the least-squares problem
numUserBlocks:默認10
numItemBlocks:默認10
maxIter:默認10
checkpointInterval
seed
coldStartStrategy:設定模型如何預測新客戶或商品(訓練時沒見過的),只可選drop
and nan
。
blocks一般是one to five million ratings per block,如果少於這個數,更多的block也不會提升效率
val ratings = spark.read.textFile("/data/sample_movielens_ratings.txt")
.selectExpr("split(value , ‘::‘) as col")
.selectExpr(
"cast(col[0] as int) as userId",
"cast(col[1] as int) as movieId",
"cast(col[2] as float) as rating",
"cast(col[3] as long) as timestamp")
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
println(als.explainParams())
val alsModel = als.fit(training)
val predictions = alsModel.transform(test)
//結果,查看排名前十的
alsModel.recommendForAllUsers(10)
.selectExpr("userId", "explode(recommendations)").show()
alsModel.recommendForAllItems(10)
.selectExpr("movieId", "explode(recommendations)").show()
//評估,先將cold-start strategy設為drop
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
//Regression Metrics
val regComparison = predictions.select("rating", "prediction")
.rdd.map(x => (x.getFloat(0).toDouble,x.getFloat(1).toDouble))
val metrics = new RegressionMetrics(regComparison)
metrics.rootMeanSquaredError//和上面Root-mean-square error一樣
//Ranking Metrics,下面評估不關註值,而是ALS是否會推薦某個值以上的商品
//下面將rating大於2.5的定為好的商品
val perUserActual = predictions
.where("rating > 2.5")
.groupBy("userId")
.agg(expr("collect_set(movieId) as movies"))
//下面提取ALS推薦的商品
val perUserPredictions = predictions
.orderBy(col("userId"), col("prediction").desc)
.groupBy("userId")
.agg(expr("collect_list(movieId) as movies"))
//合並上面兩個數據,並截取推薦中的前15個
val perUserActualvPred = perUserActual.join(perUserPredictions, Seq("userId"))
.map(row => (
row(1).asInstanceOf[Seq[Integer]].toArray,
row(2).asInstanceOf[Seq[Integer]].toArray.take(15)
))
//判斷推薦的平均正確率
val ranks = new RankingMetrics(perUserActualvPred.rdd)
ranks.meanAveragePrecision
ranks.precisionAt(5)//看具體排第5的準確率
latent-factor models:通過相對少數量的unobserved, underlying reasons來解釋客戶和商品之間大量的interactions。
matrix factorization model: 假設一個網格,row代表用戶,col代表商品,如果某格子為1,該格子對應的用戶和商品有interaction。這可以是一個x * y的矩陣A。利用latent-factor model的思想,把客戶和產品通過產品類型(如電子,圖書,零食等k種)來聯系,那麽矩陣可分解為x * k和 y * k(其中一個乘以另一個的轉置就能大概還原,但不可能真正還原)。但是如果這兩個被分解出來的矩陣rank太低,對原本a*b的還原就很差。
現在目的是通過已知的A(所記錄到的用戶和商品的interaction)和Y(商品的y * k,實質也未知,通常隨機生成)求X(用戶的x * k)。由於不能真正解出,所以只能減少差別,這就是叫Least Squares的原因。alternating 來源於既可通過AY求X,也可通過AX求Y。
本模型的 user-feature 和 product-feature 的矩陣相當大(用戶數 x feature數,商品數 x feature數)
例子Recommending Music(C3)
1.text文件的切割
//rawUserArtistData為RDD[String],某row:“1000002 1 55”
val userArtistDF = rawUserArtistData.map { line =>
val Array(user, artist, _*) = line.split(‘ ‘) //利用_*可以使Array接收多於3個的參數
(user.toInt, artist.toInt)//如果數值不大,用Int更有效率。轉換後看min和max確定沒有超過限度,以及是否有負數
}.toDF("user", "artist")
//分離ID和對應的作品名,某row:“122 app”
//下面代碼不完善,不能適應沒有tab分隔的row或者空白row
rawArtistData.map { line =>
val (id, name) = line.span(_ != ‘\t‘) //在第一個tab處前斷開(保留tab),如果沒有tab就在末尾斷開(返回(String, String),但最後一個是null)。也可以用val Array(..) =.split("\t",2),2分為兩份,0為全分
(id.toInt, name.trim)
}.count()
//這個更好(下面可以直接用Exception省去一個if,但可能沒那麽安全)
val artistByID = rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != ‘\t‘)
if (name.isEmpty) {
None
}else{
try {
Some(id.toInt, name.trim)//加一個以上的括號結果一樣
} catch {
case _: NumberFormatException => None
}
}
}.toDF("id", "name")
//把ID的alias(別名)和正確的ID轉化為Map,某row:“123 22”
val artistAlias = rawArtistAlias.flatMap { line =>
val Array(artist, alias) = line.split(‘\t‘)
if (artist.isEmpty) {
None
}else{
Some((artist.toInt, alias.toInt))
}
}.collect().toMap
artistByID.filter($"id" isin (1208690, 1003926)).show()
+-------+----------------+
| id| name|
+-------+----------------+
|1208690|Collective Souls|
|1003926| Collective Soul|
+-------+----------------+
2.利用map進行數據轉換
//下面代碼把有別名ID的作品統一成唯一ID
def buildCounts(
rawUserArtistData: Dataset[String],
bArtistAlias: Broadcast[Map[Int,Int]]): DataFrame = {
rawUserArtistData.map { line =>
val Array(userID, artistID, count) = line.split(‘ ‘).map(_.toInt)
val finalArtistID =
bArtistAlias.value.getOrElse(artistID, artistID)
(userID, finalArtistID, count)
}.toDF("user", "artist", "count")
}
val bArtistAlias = spark.sparkContext.broadcast(artistAlias)
val trainData = buildCounts(rawUserArtistData, bArtistAlias)
trainData.cache()//轉換後將用於訓練的數據存到內存,否則ALS每次用到數據時都要重新算
3.ALS模型的實現以及推斷用戶偏好
val model = new ALS()
.setSeed(Random.nextLong())//不設的話會是相同的默認seed,其他Spark MLlib算法一樣
.setImplicitPrefs(true)
.setRank(10)
.setRegParam(0.01)
.setAlpha(1.0)
.setMaxIter(5)
.setUserCol("user")
.setItemCol("artist")
.setRatingCol("count")
.setPredictionCol("prediction")
.fit(trainData)
//看結果,下面兩段代碼當作閱讀理解
//1.查看某用戶接觸過的商品
val userID = 2093760
val existingArtistIDs = trainData
.filter($"user" === userID)
.select("artist").as[Int].collect()
artistByID.filter($"id" isin (existingArtistIDs:_*)).show()//註意這個_*用法
//2.將推薦的商品排名,並取前howMany個。並沒有把用戶接觸過的商品過濾掉
def makeRecommendations(
model: ALSModel,
userID: Int,
howMany: Int): DataFrame = {
val toRecommend = model.itemFactors
.select($"id".as("artist"))
.withColumn("user", lit(userID))
model.transform(toRecommend)
.select("artist", "prediction")
.orderBy($"prediction".desc)
.limit(howMany)
}
val topRecommendations = makeRecommendations(model, userID, 5)
//在2.2中,直接用下面代碼可以得到全部前10。
alsModel.recommendForAllUsers(10)
.selectExpr("userId", "explode(recommendations)").show()
//根據結果推測用戶的偏好(根據所推薦商品的名字)
//提取被推薦的商品的ID
val recommendedArtistIDs =
topRecommendations.select("artist").as[Int].collect()
//查看ID對應的商品名
artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show()
4.ALS模型的評估
假設interaction越多,越喜歡。盡管用戶之前的一些interaction沒有被記錄,而且少interaction並不一定是壞的推薦。
//書中利用自己編寫的mean AUC進行評估
//先劃分訓練集和測試集,並cache。下一節的CrossValidator結合pipeline更實用。
val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1)) trainData.cache()
cvData.cache()
//計算量大而體積不大的變量也要broadcast?
val allArtistIDs = allData.select("artist").as[Int].distinct().collect()
val bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs)
//重新執行3中的模型
//下面是書中自定義的方法。areaUnderCurve在其GitHub中。
areaUnderCurve(cvData, bAllArtistIDs, model.transform)
5.調參
//由於沒有合適的evaluator,沒有連成pipelines,這裏就手動寫grid調超參數
val evaluations =
for (rank <- Seq(5, 30);
regParam <- Seq(1.0, 0.0001);
alpha <- Seq(1.0, 40.0))
yield {
val model = new ALS().
...//參數略
val auc = areaUnderCurve(cvData, bAllArtistIDs, model.transform)
model.userFactors.unpersist()//測試完後馬上清空
model.itemFactors.unpersist()
(auc, (rank, regParam, alpha))
}
//打印結果
evaluations.sorted.reverse.foreach(println)
println(s"$userID -> ${recommendedArtists.mkString(", ")}")
例子的一些補充:
沒有考察數值範圍的合理性,例如一些播放時長超過現實可能(聽某個artist的作品33年時間)
沒有處理缺失或者無意義值,例如unknown artist
2.Frequent Pattern Mining(需要查看官方例子)
Unsupervised Learning
模型Scalability
Model | Statistical recommendation | Computation limits | Training examples |
---|---|---|---|
k-means | 50 to 100 maximum | Features x clusters < 10 million | No limit |
Bisecting k-means | 50 to 100 maximum | Features x clusters < 10 million | No limit |
GMM | 50 to 100 maximum | Features x clusters < 10 million | No limit |
LDA | An interpretable number | 1,000s of topics | No limit |
k-means | Bisecting k-means | GMM | LDA(暫略) |
---|---|---|---|
k | ?? | ?? | ??:默認10 |
initMode:random and 默認??-means|| | minDivisibleClusterSize:默認1 | ||
initSteps:默認2,??-means|| 初始化的步數 | |||
maxIter:默認20 | maxIter:默認20 | maxIter:默認100 | |
tol:默認0.001(越小可以移動得越多) | tol:默認0.01,會受maxIter限制 |
Anomaly Detection in Network Traffic (C5)
//查看cluster對label的分組情況
val withCluster = pipelineModel.transform(numericOnly)
withCluster.select("cluster", "label").
groupBy("cluster", "label").count().
orderBy($"cluster", $"count".desc).
show(25)
1.評估KMean(拐點或Entropy)
//由於非監督學習缺乏evaluator,所以網格也只能手動算
def clusteringScore0(data: DataFrame, k: Int): Double = {
val assembler = new VectorAssembler()
.setInputCols(data.columns.filter(_ != "label"))
.setOutputCol("featureVector")
val kmeans = new KMeans()
.setSeed(Random.nextLong())
.setK(k)
.setPredictionCol("cluster")
.setFeaturesCol("featureVector")
val pipeline = new Pipeline().setStages(Array(assembler, kmeans))
val kmeansModel = pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
kmeansModel.computeCost(assembler.transform(data)) / data.count()
}
//在拐點找出合適的K
(20 to 100 by 20).map(k => (k, clusteringScore0(numericOnly, k))).
foreach(println)
//定義entropy的計算方法
def entropy(counts: Iterable[Int]): Double = {
val values = counts.filter(_ > 0)
val n = values.map(_.toDouble).sum
values.map { v =>
val p=v/n
-p * math.log(p)
}.sum
}
//計算Entropy評分
val clusterLabel = pipelineModel.transform(data).
select("cluster", "label").as[(Int, String)]
val weightedClusterEntropy = clusterLabel.
groupByKey { case (cluster, _) => cluster }.
mapGroups { case (_, clusterLabels) =>
val labels = clusterLabels.map { case (_, label) => label }.toSeq
val labelCounts = labels.groupBy(identity).values.map(_.size)
labels.size * entropy(labelCounts)
}.collect()
weightedClusterEntropy.sum / data.count()
2.分類變量轉為one-hot
//nonnumeric features 不能用於KMean,可將categorical features轉為one-hot
//可以直接在函數上加col參數,而不需要指明表...
def oneHotPipeline(inputCol: String): (Pipeline, String) = {
val indexer = new StringIndexer().
setInputCol(inputCol).
setOutputCol(inputCol + "_indexed")
val encoder = new OneHotEncoder().
setInputCol(inputCol + "_indexed").
setOutputCol(inputCol + "_vec")
val pipeline = new Pipeline().setStages(Array(indexer, encoder))
(pipeline, inputCol + "_vec")
}
3.找出異常
val clustered = pipelineModel.transform(data)
val threshold = clustered.
select("cluster", "scaledFeatureVector").as[(Int, Vector)].
map { case (cluster, vec) => Vectors.sqdist(centroids(cluster), vec) }.
orderBy($"value".desc).take(100).last
val originalCols = data.columns
val anomalies = clustered.filter { row =>
val cluster = row.getAs[Int]("cluster")
val vec = row.getAs[Vector]("scaledFeatureVector")
Vectors.sqdist(centroids(cluster), vec) >= threshold }.select(originalCols.head, originalCols.tail:_*)//select("*")
anomalies.first()
本例子的距離函數可以改為Mahalanobis distance,但Spark暫時沒有
運用Gaussian mixture model 或 DBSCAN (未實現)也是一個選擇。
MLlib(了解)
Mllib的supervised用labeled points,而unsupervised用vectors。註意它們不同於Scala和spark ml的同名類。fromML可以將ml的vectors轉換為mllib的
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS,
LogisticRegressionModel}
import org.apache.spark.mllib.linalg.{Vector => SparkVector}//改名,避免混淆
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature._
feature encoding 和 data preparation
Mllib 提供feature selection和scaling
//numeric數據
Vectors.dense()/.dense()
//對於文本數據
//HashingTF,如果需要HashingTF處理結果外的信息,應像下面那樣用。
def toVectorPerserving(rdd: RDD[RawPanda]): RDD[(RawPanda, SparkVector)] = {
val ht = new HashingTF()
rdd.map{panda =>
val textField = panda.pt
val tokenizedTextField = textField.split(" ").toIterable
(panda, ht.transform(tokenizedTextField))
}
}
//Word2Vec
def word2vecTrain(rdd: RDD[String]): Word2VecModel = {
// Tokenize our data
val tokenized = rdd.map(_.split(" ").toIterable)
val wv = new Word2Vec()
wv.fit(tokenized)
}
//準備訓練數據
//Supervised,LabeledPoint接收double和vectors
LabeledPoint(booleanToDouble(rp.happy), Vectors.dense(combined))
//文字數據創建map
val distinctLabels: Array[T] = rdd.distinct().collect()
distinctLabels.zipWithIndex.map{case (label, x) => (label, x.toDouble)}.toMap
//scaling and selection
//training and prediction
//保存
//Saveable(internal format)
model.save()
Spercific_Model.load()
//PMMLExportable,Spark能產出,但不能直接讀取
model.toPMML()
補充說明
1.參數最好全部顯式設置,不同版本的默認值可能變。
參考:
Spark: The Definitive Guide
high-performance-spark
Advanced Analytics with Spark 2nd Edition
Spark之MLlib