用Spark完成復雜TopN計算的兩種邏輯
阿新 • • 發佈:2019-04-29
value 轉換 lang tuple 調用 優先級 cat rdd void
如果有商品品類的數據pairRDD(categoryId,clickCount_orderCount_payCount),用Spark完成Top5,你會怎麽做?
這裏假設使用Java語言進行編寫,那麽你有兩種思路:
1.簡化成RDD(categoryObject),其中categoryObject實現了java.lang.Comparable.然後使用top(5)獲得topN
2.轉換成PairRDD(categoryKey,info),其中categoryKey實現了scala.math.Ordered。然後進行sortByKey之後再take(5).
註意:
1)top(n)函數在Java的Spark API中內部調用的比較器是java.lang.Comparable進行比較.
2)而sortByKey函數在Java的Spark API中依然調用scala.math.Ordered進行比較.
相比之下,思路2的空間和時間都不如思路1,但是如果我們需要sort結果的過程中順便獲得topN,則使用思路2更好一些。
思路2實現:
CategoryKey:
package com.stan.core.spark.userAction; import scala.Serializable; import scala.math.Ordered; /** * 用於按照 * (clickCategoryCount,orderCategoryCount,defrayCategoryCount)的優先級排序*/ public class ComparableCategoryKey // scala中可比較,以便於進行RDD排序 implements Ordered<ComparableCategoryKey>, Serializable { String categoryId; Long clickCategoryCount; Long orderCategoryCount; Long defrayCategoryCount; /** * 計算比較數 * * 因為優先級為 : 先比較支付量,若支付量相同,則比較下單量,若下單量相同,則繼續比較點擊量 * 所以我在進行比較的時候直接使用 比較值 = 支付量差 * 10000 + 下單量差 * 100 + 點擊量差 * 若比較值小於 0 ,則小於,若比較值大於0 , 則大於,若比較值等於0,則等於 *@param comparableCategoryWithAllCount * @return */ public long computeCompareNum(ComparableCategoryKey comparableCategoryWithAllCount){ long compareNum = (this.defrayCategoryCount - comparableCategoryWithAllCount.defrayCategoryCount) * 10000 + (this.orderCategoryCount - comparableCategoryWithAllCount.orderCategoryCount) * 100 + (this.clickCategoryCount - comparableCategoryWithAllCount.clickCategoryCount) * 1; return compareNum; } @Override public int compare(ComparableCategoryKey comparableCategoryWithAllCount) { return (int)(computeCompareNum(comparableCategoryWithAllCount)%1000); } @Override public boolean $less(ComparableCategoryKey comparableCategoryWithAllCount) { return computeCompareNum(comparableCategoryWithAllCount) < 0; } @Override public boolean $greater(ComparableCategoryKey comparableCategoryWithAllCount) { return computeCompareNum(comparableCategoryWithAllCount) > 0; } @Override public boolean $less$eq(ComparableCategoryKey comparableCategoryWithAllCount) { return computeCompareNum(comparableCategoryWithAllCount) <= 0; } @Override public boolean $greater$eq(ComparableCategoryKey comparableCategoryWithAllCount) { return computeCompareNum(comparableCategoryWithAllCount) >= 0; } @Override public int compareTo(ComparableCategoryKey comparableCategoryWithAllCount) { return (int)(computeCompareNum(comparableCategoryWithAllCount)%1000); } public String getCategoryId() { return categoryId; } public void setCategoryId(String categoryId) { this.categoryId = categoryId; } public Long getClickCategoryCount() { return clickCategoryCount; } public void setClickCategoryCount(Long clickCategoryCount) { this.clickCategoryCount = clickCategoryCount; } public Long getOrderCategoryCount() { return orderCategoryCount; } public void setOrderCategoryCount(Long orderCategoryCount) { this.orderCategoryCount = orderCategoryCount; } public Long getDefrayCategoryCount() { return defrayCategoryCount; } public void setDefrayCategoryCount(Long defrayCategoryCount) { this.defrayCategoryCount = defrayCategoryCount; } @Override public String toString() { return "ComparableCategoryKey{" + "categoryId=‘" + categoryId + ‘\‘‘ + ", clickCategoryCount=" + clickCategoryCount + ", orderCategoryCount=" + orderCategoryCount + ", defrayCategoryCount=" + defrayCategoryCount + ‘}‘; } }
具體的調用過程:
// 1.封裝成(categoryKey,info) JavaPairRDD<ComparableCategoryKey,String> comparableCategory2AllCountRDD = categoryId2allCount.mapToPair( new PairFunction<Tuple2<String, String>, ComparableCategoryKey,String>() { @Override public Tuple2<ComparableCategoryKey,String> call(Tuple2<String, String> stringStringTuple2) throws Exception { String categoryId = stringStringTuple2._1; String allCount = stringStringTuple2._2; String[] tmpAllCountSplited = allCount.split("_"); Long clickCount = Long.valueOf(tmpAllCountSplited[0]); Long orderCount = Long.valueOf(tmpAllCountSplited[1]); Long defrayCount = Long.valueOf(tmpAllCountSplited[2]); ComparableCategoryKey comparableCategoryWithAllCount = new ComparableCategoryKey(); comparableCategoryWithAllCount.setCategoryId(categoryId); comparableCategoryWithAllCount.setClickCategoryCount(clickCount); comparableCategoryWithAllCount.setOrderCategoryCount(orderCount); comparableCategoryWithAllCount.setDefrayCategoryCount(defrayCount); return new Tuple2<>(comparableCategoryWithAllCount,allCount); } } ); // 2.sortByKey 排序 comparableCategory2AllCountRDD.sortByKey(); // 3.獲取前五 List<Tuple2<ComparableCategoryKey,String>> top10Categorys = comparableCategory2AllCountRDD.take(5);
用Spark完成復雜TopN計算的兩種邏輯