1. 程式人生 > >ALS推薦演算法在Spark上的優化從50分鐘到3分鐘

ALS推薦演算法在Spark上的優化從50分鐘到3分鐘

從50多分鐘到3分鐘的優化

某推薦系統需要基於Spark用ALS演算法對近一天的資料進行實時訓練, 然後進行推薦. 輸入的資料有114G, 但訓練時間加上預測的時間需要50多分鐘, 而業務的要求是在15分鐘左右, 遠遠達不到實時推薦的要求, 因此, 我們與業務側一起對Spark應用進行了優化.

另外提一下, 該文最好與之前我寫的另一篇blog < Spark + Kafka 流計算優化 > 一起看, 因為一些細節我不會再在該文中描述.

優化分析

從資料分析, 雖然資料有114G, 但ALS的模型訓練時間並不長, 反而是資料載入和ALS預測所佔用的時間較長. 因此我們把重點放在這兩點的優化中.

從Spark的Web UI可以看到, 資料載入的job中task的數量較多, 較小. 模型預測的job也是有這樣的問題, 因此, 可以猜測並行度過大造成了叢集的協調負荷過重.

僅降低並行度和優化JVM 引數

         我們用常見的”rdd.repartitionBy”把並行度降低後, 作業計算耗時減少並不明顯, 且在模型預測的job中會有executor死掉的現象. 進而檢視日誌, 發現是記憶體佔用過多, yarn把Spark應用給kill了.

         為解決該現象, 加入這些JVM引數: “ -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:MaxTenuringThreshold=72 -XX:NewRatio=2 -XX:SurvivorRatio=6 -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:+CMSPermGenSweepingEnabled -XX:+CMSClassUnloadingEnabled -XX:MaxTenuringThreshold=31 -XX:SurvivorRatio=8 -XX:+ExplicitGCInvokesConcurrent -XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses -XX:+AlwaysPreTouch -XX:-OmitStackTraceInFastThrow -XX:+UseCompressedStrings -XX:+UseStringCache -XX:+OptimizeStringConcat -XX:+UseCompressedOops -XX:+CMSScavengeBeforeRemark -XX:+UseBiasedLocking -XX:+AggressiveOpts " 具體說明和原因請參考本人的另一篇blog < Spark + Kafka 流計算優化 >.

笛卡爾積操作中的預先分塊

作了上述例行優化後,  ALS預測步驟的耗時減少依然不明顯, 為發掘原因, 我們看了下原始碼, 驚奇的發現在ALS預測中竟然是用了笛卡爾積操作, 114G的資料少說也有幾千萬行記錄, 幾千萬行記錄進行笛卡爾積, 不慢才怪吧.

還好, 我們有擴充套件版的ALS預測方法, 可以將資料預先分塊, 而不必一行行的進行笛卡爾積, 加快了笛卡爾積的速度.

val recommendations = ExtMatrixFactorizationModelHelper.recommendProductsForUsersmodel.getnumRecommendations

420000StorageLevel.MEMORY_AND_DISK_SER )

該方法會對model中的userFeatures和itemFeatures矩陣進行預先分塊, 減少網路包的傳輸量和笛卡爾積的計算量.

這裡的第三個引數是每一個塊所包含的行數, 此處的420000表示當我們對userFeatures或itemFeatures進行分塊時, 每一個塊包含了矩陣的420000行.

如何計算這裡的一個塊要包含多少行呢, 舉例如下:

由於ALS演算法中設定的rank是10, 因此生成的userFeatures和itemFeatures的個數是10, 它們的每一行是(Int,Array[Double]), 其中Array.size是10.

因此可根據如下計算每行所佔的空間大小:

空Array[10]的大小=16+8*10=96Byte. 陣列中的元素是Double, 十個Double物件的大小是 16*10=160Byte. 作為key的Integer的大小是16Byte. 因此每行佔空間96+160+16=212Byte.

另外,要計算頻寬: 由於是千兆網絡卡,因此頻寬為1Gbit,轉換為Byte也就是128MByte的頻寬.

考慮到” spark.akka.frameSize=100”以及網路包包頭需要佔的空間, 和Java的各種封裝要佔的空間, 我們計劃讓1個block就幾乎佔滿頻寬, 也就是一個block會在100MByte左右.

因此, 一個block要佔 60*1024*1024/212=296766行, 因此blocksize=494611, 考慮到各個object也佔記憶體, 因此行數定為420000左右.

在分塊後ExtMatrixFactorizationModelHelper.recommendProductsForUsers中會對塊進行重新分割槽, 以達到基於塊的均勻分佈.

提高檔案載入速度

         以前都是載入小檔案, 每個檔案才幾M大小, 遠遠低於Hadoop的塊大小, 使得IO頻繁, 檔案也頻繁開啟關閉, 載入速度自然就慢. 為解決該問題, 我們使用sc.wholeTextFiles(dirstr,inputSplitNum)來載入HDFS的檔案到Spark中. 該方法使用Hadoop的CombineFileInputFormat把多個小檔案合併成一個Split再載入到Spark中.

         但其實, 載入速度對整個Job的執行效率影響不大, 效果有限.


上圖,是val inputData = sc.wholeTextFiles(dirstr,80) 和RangePartition.

貌似載入速度也好不到哪兒去.第一個job用了11min

'

上圖用的是valinputData = sc.textFile(dirstr)RangeRepartition,同等情況下,載入也需要11min. (job6)


上圖,是val inputData = sc.wholeTextFiles(dirstr,120) 和RangePartition.

貌似提高了wholeTextFiles()的split數量可以提升效能, 從8.4min多(此時split為80左右)到現在的8.1min


上圖,是val inputData = sc.wholeTextFiles(dirstr,220) 和RangePartition. 第一次載入提升到6.0min.

其它job由於loclity的問題,時間有所拉長, 影響不大.

 

上圖,是val inputData = sc.wholeTextFiles(dirstr,512) 和RangePartition. 第一次載入要7.1min. 估計220個split應該是個比較好的值了. 按比例就是 220(file split數) / 27000(小檔案數) = 0.8% , 該場景中每個小檔案10M左右. 也就是每個file split包含123個小檔案­­­­, 每個file split 1230M, 也就是約1G左右.

減少笛卡爾積計算量

回到對笛卡爾積計算的優化, 因為50分鐘的計算量基本上都是耗在笛卡爾積的計算上的.

         我們先看一下task的分佈圖, 由圖看到, 資料並不是十分雜湊:

   猜測原因肯是通過Array.mkString.hashCode作為key並不能保證資料的均勻雜湊.因此我們disable掉了在笛卡爾積計算前的預分塊時的再分割槽而是把再分割槽提到分塊之前.

這樣一來, task分佈稍微均衡了一些, 但依然不甚理想. 為了合理的降低task數量和均勻task的分佈, 我們進一步使用了Spark擴充套件版本的自動分割槽功能.

         val userFactors = ExtSparkHelper.repartionPairRDDBySize(oldUsrFactThreeM)

         這個方法有兩個引數, 第一個是需要分割槽的RDD, 第二個是我們期望的每一個task的input data的大小. 一般來說, input data與計算產生的data的大小相差不大, 但笛卡爾積卻不同, 有可能產生上百倍的中間資料量. 因此, 我這裡設定的每個task的input data是3M, 計算產生的中間資料剛好在1G左右, 一個executor可以同時跑3個task, 也算是比較理想的.

優化後的task分佈圖也比較理想, 十分均勻且沒有任何浪費, 如下:


因此, 這一步優化後, 原笛卡爾積的執行速度從幾十分鐘變為十幾秒.

整個ALS應用原來跑114G資料需要20多分鐘,如下圖現在只需要不到4min:


試了一下跑一天的全量資料231.9G, 則原來需要1個多小時,現在只要不到6分鐘如下圖: