Spark取出(Key,Value)型資料中Value值為前n條資料
最近在使用Spark進行一些日誌分析,需要對日誌中的一些(key,value)型資料進行排序,並取出value最多的10條資料。經過查詢資料,發現Spark中的top()函式可以取出排名前n的元素,以及sortBy()函式可以對(key,value)資料根據value進行排序,原以為一切都很好解決,但是實際情況並沒有得到想要的結果資料,研究了部分原始碼,才最終達到了想要的資料,特在此備註和分享。
前期遇到的坑
剛開始,通過查詢資料,知道Spark可以使用sortByKey()和sortBy() 兩個函式對(key,value)型資料排序。於是,直接使用sortByKey()進行排序,排完之後才發現,排序的時候是根據Key排序,而我需要先對Key進行彙總,再根據Value進行排序。顯然,sortByKey不能滿足需求!
於是,開始嘗試使用sortBy()函式,使用方法為 rdd.sortBy(_._2,false),即可對value進行降序排序。在測試的時候,我使用了rdd.sortBy(_._2,false).collect()進行排序和彙總,但是collect()函式會將所有的資料彙總到Driver,當資料量太大時對導致Driver中的記憶體不足。於是,想著只取將10條資料返回給Driver。經過查詢,知道top()函式可以取出前10條資料。
top()函式中的坑及其解決方法
知道可以用top()函式取出前10條資料,以為這麼簡單就能得到想要的資料,好激動!誰知,我還是高興得太早了-_-
後來,靜下心來再看了幾遍,終於發現不是太對了。我的資料型別是(key,value)格式的,rdd.sortBy(_._2,false)中實現了根據value值排序的目的,但是 .top(10) 卻取出了key為前10的資料。top()函式原始碼中,對RDD中的資料進行了reduce操作,並將結果進行排序。所以,rdd.sortBy(_._2,false).top(10) 這段程式碼先是對(key,value)資料根據value進行排序,而top()函式中,資料又再次對key進行了排序,導致之前根絕value排序的結果亂序了,所以最後取到的是key排在前10的資料。這就是導致問題的原因,終於被我發現了!
問題雖然被發現了,但是怎麼解決呢?說實話,我對Scala也不是太瞭解,只能去QQ群裡請教了一些大神。有一位叫做老徐的大神幫我給出瞭解決方法: rdd.sortBy(_._2,false).top(10)(Ordering.by(e => e._2))。再次執行,果然能得到正確結果。後來再仔細想想,覺得sortBy()函式有點多餘,於是變成rdd.top(10)(Ordering.by(e => e._2))。至此,已經能對(key,value)型別的資料進行彙總,然後根據value值進行排序,最後取出value排名前10的資料了。
take()函式實現目標
在請教大神的時候,偶然接觸到了take()函式,經過測試: rdd.sortBy(_._2,false).take(10) 這段程式碼能得到value排名前10的資料。
檢視take()函式的原始碼,如下:
/**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @note due to complications in the internal implementation, this method will raise
* an exception if called on an RDD of `Nothing` or `Null`.
*/
def take(num: Int): Array[T] = withScope {
if (num == 0) {
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
if (buf.size == 0) {
numPartsToTry = partsScanned * 4
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
val left = num - buf.size
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += p.size
}
buf.toArray
}
}
該函式的註解指出,take()函式通過掃描一個數據分割槽,並取出該分割槽中的前n個數據,避免了其它分割槽資料的檢索。最主要的是,該函式沒有對父RDD中的資料進行重新分割槽,所以,資料的分割槽和排序順序並沒有改變,因此能取出value排名前10的資料。
總結
經過上面的這些折騰,發現top()函式中所遇到的坑的實質是由於(key,value)資料在sortBy(_._2)函式中根據value進行排序的時候,會進行Shuffle操作,根據value值將原來的資料進行重新分割槽。而sortBy()對資料排序之後,在top()函式中進行排序時,會根據key進行Shuffle操作,並得到根據key排序和分割槽之後的新RDD,所以導致最後的結果跟預期的不一致。
只要肯花時間,自己的潛力還是可以被挖掘出來的!