1. 程式人生 > >用Spark完成復雜TopN計算的兩種邏輯

用Spark完成復雜TopN計算的兩種邏輯

value 轉換 lang tuple 調用 優先級 cat rdd void

如果有商品品類的數據pairRDD(categoryId,clickCount_orderCount_payCount),用Spark完成Top5,你會怎麽做?

這裏假設使用Java語言進行編寫,那麽你有兩種思路:

1.簡化成RDD(categoryObject),其中categoryObject實現了java.lang.Comparable.然後使用top(5)獲得topN

2.轉換成PairRDD(categoryKey,info),其中categoryKey實現了scala.math.Ordered。然後進行sortByKey之後再take(5).

註意:

1)top(n)函數在Java的Spark API中內部調用的比較器是java.lang.Comparable進行比較.

2)而sortByKey函數在Java的Spark API中依然調用scala.math.Ordered進行比較.

相比之下,思路2的空間和時間都不如思路1,但是如果我們需要sort結果的過程中順便獲得topN,則使用思路2更好一些。

思路2實現:

CategoryKey:

package com.stan.core.spark.userAction;

import scala.Serializable;
import scala.math.Ordered;

/**
 * 用於按照
 * (clickCategoryCount,orderCategoryCount,defrayCategoryCount)的優先級排序
 
*/ public class ComparableCategoryKey // scala中可比較,以便於進行RDD排序 implements Ordered<ComparableCategoryKey>, Serializable { String categoryId; Long clickCategoryCount; Long orderCategoryCount; Long defrayCategoryCount; /** * 計算比較數 * * 因為優先級為 : 先比較支付量,若支付量相同,則比較下單量,若下單量相同,則繼續比較點擊量 * 所以我在進行比較的時候直接使用 比較值 = 支付量差 * 10000 + 下單量差 * 100 + 點擊量差 * 若比較值小於 0 ,則小於,若比較值大於0 , 則大於,若比較值等於0,則等於 *
@param comparableCategoryWithAllCount * @return */ public long computeCompareNum(ComparableCategoryKey comparableCategoryWithAllCount){ long compareNum = (this.defrayCategoryCount - comparableCategoryWithAllCount.defrayCategoryCount) * 10000 + (this.orderCategoryCount - comparableCategoryWithAllCount.orderCategoryCount) * 100 + (this.clickCategoryCount - comparableCategoryWithAllCount.clickCategoryCount) * 1; return compareNum; } @Override public int compare(ComparableCategoryKey comparableCategoryWithAllCount) { return (int)(computeCompareNum(comparableCategoryWithAllCount)%1000); } @Override public boolean $less(ComparableCategoryKey comparableCategoryWithAllCount) { return computeCompareNum(comparableCategoryWithAllCount) < 0; } @Override public boolean $greater(ComparableCategoryKey comparableCategoryWithAllCount) { return computeCompareNum(comparableCategoryWithAllCount) > 0; } @Override public boolean $less$eq(ComparableCategoryKey comparableCategoryWithAllCount) { return computeCompareNum(comparableCategoryWithAllCount) <= 0; } @Override public boolean $greater$eq(ComparableCategoryKey comparableCategoryWithAllCount) { return computeCompareNum(comparableCategoryWithAllCount) >= 0; } @Override public int compareTo(ComparableCategoryKey comparableCategoryWithAllCount) { return (int)(computeCompareNum(comparableCategoryWithAllCount)%1000); } public String getCategoryId() { return categoryId; } public void setCategoryId(String categoryId) { this.categoryId = categoryId; } public Long getClickCategoryCount() { return clickCategoryCount; } public void setClickCategoryCount(Long clickCategoryCount) { this.clickCategoryCount = clickCategoryCount; } public Long getOrderCategoryCount() { return orderCategoryCount; } public void setOrderCategoryCount(Long orderCategoryCount) { this.orderCategoryCount = orderCategoryCount; } public Long getDefrayCategoryCount() { return defrayCategoryCount; } public void setDefrayCategoryCount(Long defrayCategoryCount) { this.defrayCategoryCount = defrayCategoryCount; } @Override public String toString() { return "ComparableCategoryKey{" + "categoryId=‘" + categoryId + ‘\‘‘ + ", clickCategoryCount=" + clickCategoryCount + ", orderCategoryCount=" + orderCategoryCount + ", defrayCategoryCount=" + defrayCategoryCount + ‘}‘; } }

具體的調用過程:

// 1.封裝成(categoryKey,info)
JavaPairRDD<ComparableCategoryKey,String> comparableCategory2AllCountRDD =
                categoryId2allCount.mapToPair(
                new PairFunction<Tuple2<String, String>, ComparableCategoryKey,String>() {
                    @Override
                    public Tuple2<ComparableCategoryKey,String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                        String categoryId = stringStringTuple2._1;
                        String allCount = stringStringTuple2._2;
                        String[] tmpAllCountSplited = allCount.split("_");
                        Long clickCount = Long.valueOf(tmpAllCountSplited[0]);
                        Long orderCount = Long.valueOf(tmpAllCountSplited[1]);
                        Long defrayCount = Long.valueOf(tmpAllCountSplited[2]);
                        ComparableCategoryKey comparableCategoryWithAllCount =
                                new ComparableCategoryKey();
                        comparableCategoryWithAllCount.setCategoryId(categoryId);
                        comparableCategoryWithAllCount.setClickCategoryCount(clickCount);
                        comparableCategoryWithAllCount.setOrderCategoryCount(orderCount);
                        comparableCategoryWithAllCount.setDefrayCategoryCount(defrayCount);
                        return new Tuple2<>(comparableCategoryWithAllCount,allCount);
                    }
                }
        );
        // 2.sortByKey 排序
        comparableCategory2AllCountRDD.sortByKey();

        // 3.獲取前五
        List<Tuple2<ComparableCategoryKey,String>> top10Categorys =  comparableCategory2AllCountRDD.take(5);

用Spark完成復雜TopN計算的兩種邏輯