二次排序與分組取TopN
二次排序
SparkConf sparkConf = new SparkConf()
.setMaster(“local”)
.setAppName(“SecondarySortTest”);
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD secondRDD = sc.textFile(“secondSort.txt”);
JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
/** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<SecondSortKey, String> call(String line) throws Exception { String[] splited = line.split(" "); int first = Integer.valueOf(splited[0]); int second = Integer.valueOf(splited[1]); SecondSortKey secondSortKey = new SecondSortKey(first,second); return new Tuple2<SecondSortKey, String>(secondSortKey,line); }
});
pairSecondRDD.sortByKey(false).foreach(new
VoidFunction<Tuple2<SecondSortKey,String>>() {
/** * */ private static final long serialVersionUID = 1L; @Override public void call(Tuple2<SecondSortKey, String> tuple) throws Exception { System.out.println(tuple._2); }
});
public class SecondSortKey implements Serializable,Comparable{
/**
*
*/
private static final long serialVersionUID = 1L;
private int first;
private int second;
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public SecondSortKey(int first, int second) {
super();
this.first = first;
this.second = second;
}
@Override
public int compareTo(SecondSortKey o1) {
if(getFirst() - o1.getFirst() ==0 ){
return getSecond() - o1.getSecond();
}else{
return getFirst() - o1.getFirst();
}
}
}
分組取topN和topN
SparkConf conf = new SparkConf()
.setMaster(“local”)
.setAppName(“TopOps”);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD linesRDD = sc.textFile(“scores.txt”);
JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String str) throws Exception {
String[] splited = str.split("\t");
String clazzName = splited[0];
Integer score = Integer.valueOf(splited[1]);
return new Tuple2<String, Integer> (clazzName,score);
}
});
pairRDD.groupByKey().foreach(new
VoidFunction<Tuple2<String,Iterable>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
String clazzName = tuple._1;
Iterator<Integer> iterator = tuple._2.iterator();
Integer[] top3 = new Integer[3];
while (iterator.hasNext()) {
Integer score = iterator.next();
for (int i = 0; i < top3.length; i++) {
if(top3[i] == null){
top3[i] = score;
break;
}else if(score > top3[i]){
for (int j = 2; j > i; j--) {
top3[j] = top3[j-1];
}
top3[i] = score;
break;
}
}
}
System.out.println(“class Name:”+clazzName);
for(Integer sscore : top3){
System.out.println(sscore);
}
}
});