1. 程式人生 > >《深入理解Spark》之通過sample運算元找出導致資料傾斜的key

《深入理解Spark》之通過sample運算元找出導致資料傾斜的key

最近在整理原來學過的內容,看到sample運算元就寫一篇在實際開發中sample運算元的具體應用

sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long)

sample運算元時用來抽樣用的,其有3個引數

withReplacement:表示抽出樣本後是否在放回去,true表示會放回去,這也就意味著抽出的樣本可能有重複

fraction :抽出多少,這是一個double型別的引數,0-1之間,eg:0.3表示抽出30%

seed:表示一個種子,根據這個seed隨機抽取,一般情況下只用前兩個引數就可以,那麼這個引數是幹嘛的呢,這個引數一般用於除錯,有時候不知道是程式出問題還是資料出了問題,就可以將這個引數設定為定值

下面是程式碼:

大概思路是:通過抽樣取出一部分樣本,在對樣本做wordCount並排序最後取出出現次數最多的key,這個key就是導致資料傾斜的key

package com.lyzx.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class Day05 {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Day05");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        List<String> keys = getKeyBySample(jsc);
        System.out.println("導致資料傾斜的key是:"+keys);
        jsc.stop();
    }

    /**
     * 通過Sample運算元進行抽樣並把導致資料傾斜的key找出來
     * 然後可以做對計算做針對性的優化
     * @param jsc
     */
    public static List<String> getKeyBySample(JavaSparkContext jsc){
        List<String> data = Arrays.asList("A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A",
                "A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A",
                "A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A",
                "B","B","B","B","B","B","B","B","C","D","E","F","G");

        JavaRDD<String> rdd =  jsc.parallelize(data,2);
        List<Tuple2> item =
                rdd.mapToPair(x->new Tuple2<String,Integer>(x,1))
                .sample(true,0.4)
                .reduceByKey((x,y)->x+y)
                .map(x->new Tuple2(x._2,x._1))
                .sortBy(x->x._1,false,2)
                .take(3);

        List<String> keys = new ArrayList<>();
        System.out.println("keys="+item);
        for(int i=0;i<item.size();i++){
            if(i == item.size()-1)
               break;
            Tuple2 current = item.get(i);
            Tuple2 next = item.get(i+1);
            Integer v1 = Integer.parseInt(current._1.toString());
            Integer v2 = Integer.parseInt(next._1.toString());
            System.out.println(v1+"   "+v2);

            /**
             * 這兒的邏輯有問題,找出導致資料傾斜的key的方式和具體的業務也有關係
             * 這裡只是給了一個簡單的判斷方法,很有侷限性
             */
            if(v1/v2 >= 3){
                System.out.println("===");
                keys.add(current._2.toString());
            }
        }
        return keys;
    }
}