《深入理解Spark》之通過sample運算元找出導致資料傾斜的key
阿新 • • 發佈:2019-02-07
最近在整理原來學過的內容,看到sample運算元就寫一篇在實際開發中sample運算元的具體應用
sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long)
sample運算元時用來抽樣用的,其有3個引數
withReplacement:表示抽出樣本後是否在放回去,true表示會放回去,這也就意味著抽出的樣本可能有重複
fraction :抽出多少,這是一個double型別的引數,0-1之間,eg:0.3表示抽出30%
seed:表示一個種子,根據這個seed隨機抽取,一般情況下只用前兩個引數就可以,那麼這個引數是幹嘛的呢,這個引數一般用於除錯,有時候不知道是程式出問題還是資料出了問題,就可以將這個引數設定為定值
下面是程式碼:
大概思路是:通過抽樣取出一部分樣本,在對樣本做wordCount並排序最後取出出現次數最多的key,這個key就是導致資料傾斜的key
package com.lyzx.spark.streaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class Day05 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Day05"); JavaSparkContext jsc = new JavaSparkContext(conf); List<String> keys = getKeyBySample(jsc); System.out.println("導致資料傾斜的key是:"+keys); jsc.stop(); } /** * 通過Sample運算元進行抽樣並把導致資料傾斜的key找出來 * 然後可以做對計算做針對性的優化 * @param jsc */ public static List<String> getKeyBySample(JavaSparkContext jsc){ List<String> data = Arrays.asList("A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A", "A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A", "A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A", "B","B","B","B","B","B","B","B","C","D","E","F","G"); JavaRDD<String> rdd = jsc.parallelize(data,2); List<Tuple2> item = rdd.mapToPair(x->new Tuple2<String,Integer>(x,1)) .sample(true,0.4) .reduceByKey((x,y)->x+y) .map(x->new Tuple2(x._2,x._1)) .sortBy(x->x._1,false,2) .take(3); List<String> keys = new ArrayList<>(); System.out.println("keys="+item); for(int i=0;i<item.size();i++){ if(i == item.size()-1) break; Tuple2 current = item.get(i); Tuple2 next = item.get(i+1); Integer v1 = Integer.parseInt(current._1.toString()); Integer v2 = Integer.parseInt(next._1.toString()); System.out.println(v1+" "+v2); /** * 這兒的邏輯有問題,找出導致資料傾斜的key的方式和具體的業務也有關係 * 這裡只是給了一個簡單的判斷方法,很有侷限性 */ if(v1/v2 >= 3){ System.out.println("==="); keys.add(current._2.toString()); } } return keys; } }