spark ml 聚類原始碼筆記一
首先是引數
k : 聚類數,預設2
initMode : 初始化演算法的引數,可以是RANDOM或K_MEANS_PARALLEL,RANDOM是隨機選擇初始聚類中心,K_MEANS_PARALLEL是使用演算法選擇初始聚類中心,也是預設情況
initSteps : K_MEANS_PARALLEL方法迭代步數,預設5
接下來是一些重要的方法
private[clustering] def predict(features: Vector): Int = parentModel.predict(features)//predict預測新的屬性屬於哪個類
def clusterCenters: Array[Vector] = parentModel.clusterCenters//列出最終的聚類中心
def computeCost(dataset: DataFrame): Double = {//計算距離平方的總和
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
val data = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
parentModel.computeCost(data)//主要是呼叫computeCost方法
}
下面是聚類演算法預設的引數
setDefault(//2類,迭代20次,初始化演算法為K_MEANS_PARALLEL,該演算法迭代5次,收斂引數0.0001
k -> 2,
maxIter -> 20,
initMode -> MLlibKMeans.K_MEANS_PARALLEL,
initSteps -> 5,
tol -> 1e-4)
呼叫fit,填充引數
override def fit(dataset: DataFrame): KMeansModel = {
val rdd = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
val algo = new MLlibKMeans()
.setK($(k))
.setInitializationMode($(initMode))
.setInitializationSteps($(initSteps))
.setMaxIterations($(maxIter))
.setSeed($(seed))
.setEpsilon($(tol))
val parentModel = algo.run(rdd)//最重要的就是這裡,algo是MLlibKMeans類,傳入要聚類的rdd,呼叫run方法,得到模型,下面我們就看下這個是如何執行的
val model = new KMeansModel(uid, parentModel)
copyValues(model)
}
run方法
def run(data: RDD[Vector]): KMeansModel = {
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data is not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}
// Compute squared norms and cache them.
val norms = data.map(Vectors.norm(_, 2.0))//計算每個向量的2範數,即平方和開放
norms.persist()//快取,因為要多次用這個
val zippedData = data.zip(norms).map { case (v, norm) =>//把向量和2範數放在一起,VectorWithNorm是新的結構
new VectorWithNorm(v, norm)
}
val model = runAlgorithm(zippedData)//把VectorWithNorm放裡面執行,返回模型
norms.unpersist()//釋放快取的2範數
// Warn at the end of the run as well, for increased visibility.
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data was not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}
model//返回模型
}
下面看這個模型是怎麼獲得的
private def runAlgorithm(data: RDD[VectorWithNorm]): KMeansModel = {
val sc = data.sparkContext
val initStartTime = System.nanoTime()
// Only one run is allowed when initialModel is given
val numRuns = if (initialModel.nonEmpty) {//如果指定initialModel,這個initialModel就是初始化聚類中心,runs只能是1,這裡的nunRuns就是之前提到的initSteps,如果不指定聚類中心,預設執行5次
if (runs > 1) logWarning("Ignoring runs; one run is allowed when initialModel is given.")
1
} else {
runs
}
val centers = initialModel match {
case Some(kMeansCenters) => {//如果指定聚類中心
Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s)))//把中心弄成一個帶有2範數的VectorWithNorm的陣列
}
case None => {//如果沒指定
if (initializationMode == KMeans.RANDOM) {//看initializationMode是隨機還是用演算法,不管用哪個,最後返回都是聚類中心的陣列,後續我們分別看這兩種方式
initRandom(data)
} else {
initKMeansParallel(data)
}
}
}
val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) +
" seconds.")
val active = Array.fill(numRuns)(true)//搞兩個陣列,長度為執行次數,一個用true填充,用來表示是否還需要運算,一個用0填充,用來標示成本
val costs = Array.fill(numRuns)(0.0)
var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)//弄一個0-4的可變陣列
var iteration = 0
val iterationStartTime = System.nanoTime()
// Execute iterations of Lloyd's algorithm until all runs have converged
while (iteration < maxIterations && !activeRuns.isEmpty) {//按預設總迭代次數不超過20次,步數還有的迭代,後面會知道,迭代好的會過濾,所以越來越少
type WeightedPoint = (Vector, Long)//定義一個型別
def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = {//合併貢獻值
axpy(1.0, x._1, y._1)//把x的向量加到y的向量上
(y._1, x._2 + y._2)//返回y的向量和x+y的長度
}//這個後面會用到
val activeCenters = activeRuns.map(r => centers(r)).toArray//取出每個步驟的中心陣列
val costAccums = activeRuns.map(_ => sc.accumulator(0.0))//每步弄一個累加器,用來累加代價函式
val bcActiveCenters = sc.broadcast(activeCenters)//把中心廣播出去,這倆廣播和累加用的很經典
// Find the sum and count of points mapping to each center
val totalContribs = data.mapPartitions { points =>//分片算資料
val thisActiveCenters = bcActiveCenters.value//獲取每步的聚類中心陣列
val runs = thisActiveCenters.length//獲取步數
val k = thisActiveCenters(0).length//獲取聚類中心的維度,也就是聚成幾個點
val dims = thisActiveCenters(0)(0).vector.size//獲取聚類中心點的向量長度,也就是原始資料的屬性個數
val sums = Array.fill(runs, k)(Vectors.zeros(dims))//搞一個二維陣列,行是執行步數,列是聚類中心個數,每個元素是原始資料維度的向量,也就是一條資料
val counts = Array.fill(runs, k)(0L)//同理,再搞一哥二維陣列,存放每個類別的資料條數
points.foreach { point =>//分片的每條資料
(0 until runs).foreach { i =>//遍歷步數
val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)//傳入每步的聚類中心和固定資料,計算最短距離,返回最好的中心和代價,我們先看findClosest()如何工作的
private[mllib] def findClosest(//返回最近中心的索引和平方距離,注意引數都是帶2範數的
centers: TraversableOnce[VectorWithNorm],
point: VectorWithNorm): (Int, Double) = {
var bestDistance = Double.PositiveInfinity//距離弄成正無窮
var bestIndex = 0//索引弄成0
var i = 0
centers.foreach { center =>//每個中心點與給定點的距離
// Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary
// distance computation.//通過不等式簡化距離計算,有魄力
var lowerBoundOfSqDist = center.norm - point.norm//直接2範數差平方替代距離
lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
if (lowerBoundOfSqDist < bestDistance) {//如果比最優距離小
val distance: Double = fastSquaredDistance(center, point)//又調了一個,下面是具體的實現程式碼
private[mllib] def fastSquaredDistance(//\|a - b\|_2^2 = \|a\|_2^2 + \|b\|_2^2 - 2 a^T b.用這個公式近似計算距離,我們捋下思路,先是範數計算近似距離,如果有更近的再進一步算,進一步算也是通過近似計算得到的,都是為了提高效能,確實吊
v1: Vector,
norm1: Double,
v2: Vector,
norm2: Double,
precision: Double = 1e-6): Double = {//這有個引數是精度
val n = v1.size//求向量長度
require(v2.size == n)
require(norm1 >= 0.0 && norm2 >= 0.0)
val sumSquaredNorm = norm1 * norm1 + norm2 * norm2//範數平方和
val normDiff = norm1 - norm2//範數相減,這都是根據公式
var sqDist = 0.0//距離先設為0
/*
* The relative error is
* <pre>
* EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),//根據公式弄了一個相對錯誤,EPSILON是一個趨於1但比1大的數
* </pre>
* which is bounded by
* <pre>
* 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).//搞了一個邊界,我不仔細看了
* </pre>
* The bound doesn't need the inner product, so we can use it as a sufficient condition to//他說邊界不需要計算點積
* check quickly whether the inner product approach is accurate.
*/
val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)//按公式走,這是個邊界
if (precisionBound1 < precision) {//如果邊界比精度小,再按開始的公式算帶點積的,這個太bug了
sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)
} else if (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {///如果邊界大於等於精度
val dotValue = dot(v1, v2)//老老實實算點積
sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0)//按開始的公式算,取與0中較大者
val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) //算邊界二,其實就是一個上界一個下界
(sqDist + EPSILON)//距離+常數
if (precisionBound2 > precision) {//如果大於精度
sqDist = Vectors.sqdist(v1, v2)//計算向量距離的平方,這裡面也分了很多情況,但沒什麼難度
}
} else {
sqDist = Vectors.sqdist(v1, v2)
}
sqDist//最後返回距離的平方,想完全看懂近似計算的應該去看論文了
}
if (distance < bestDistance) {//回到findClosest,如果距離不是最優,賦值,記錄索引,這個都明白
bestDistance = distance
bestIndex = i
}
}
i += 1//索引自加
}
(bestIndex, bestDistance)//返回最優索引,和最優距離
}
costAccums(i) += cost//累加距離
val sum = sums(i)(bestCenter)//拿出最優索引的
axpy(1.0, point.vector, sum)//把點放到sum,為了更新中心點
counts(i)(bestCenter) += 1//最優距離對應的中心累加,留個權重
}
}
val contribs = for (i <- 0 until runs; j <- 0 until k) yield {//每步每個中心點,看下貢獻,捋一下,為什麼迴圈runs應為每次拿資料點相當於同時做5個聚類,為什麼做5個聚類,因為初始化聚類中心會有5組,最後要選出5組最優的
((i, j), (sums(i)(j), counts(i)(j)))
}
contribs.iterator
}.reduceByKey(mergeContribs).collectAsMap()//這裡呼叫mergeContribs,把步相同中心點相同的距離和數量都累加,最後弄成一個map
bcActiveCenters.unpersist(blocking = false)//釋放快取,這個東西其實不大,主要是清理廣播變數
// Update the cluster centers and costs for each active run
for ((run, i) <- activeRuns.zipWithIndex) {
var changed = false
var j = 0
while (j < k) {//k是要聚幾個類
val (sum, count) = totalContribs((i, j))//取出對應的貢獻
if (count != 0) {//如果有貢獻
scal(1.0 / count, sum)//資料除以數量,這就是新的聚類中心
val newCenter = new VectorWithNorm(sum)
if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {//看新舊距離變化是否明顯,如果明顯置為true
changed = true
}
centers(run)(j) = newCenter//舊的不去新的不來
}
j += 1
}
if (!changed) {//如果沒改變
active(run) = false
logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations")//就說執行的第幾步運行了幾次結束了
}
costs(run) = costAccums(i).value//取出代價函式
}
activeRuns = activeRuns.filter(active(_))//迭代好的過濾
iteration += 1//迭代次數自增
}
val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.")
if (iteration == maxIterations) {
logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
} else {
logInfo(s"KMeans converged in $iteration iterations.")
}
val (minCost, bestRun) = costs.zipWithIndex.min//取出5步裡面效果最好的,我沒騙你吧
logInfo(s"The cost for the best run is $minCost.")
new KMeansModel(centers(bestRun).map(_.vector))//把最好的聚類中心弄成向量返回
}