1. 程式人生 > >二次排序與分組取TopN

二次排序與分組取TopN

二次排序

SparkConf sparkConf = new SparkConf()
.setMaster(“local”)
.setAppName(“SecondarySortTest”);
final JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaRDD secondRDD = sc.textFile(“secondSort.txt”);

JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {

/**
 * 
 */
private static final long serialVersionUID = 1L;

@Override
public Tuple2<SecondSortKey, String> call(String line) throws Exception {
       String[] splited = line.split(" ");
       int first = Integer.valueOf(splited[0]);
       int second = Integer.valueOf(splited[1]);
       SecondSortKey secondSortKey = new SecondSortKey(first,second);
       return new Tuple2<SecondSortKey, String>(secondSortKey,line);
}

});

pairSecondRDD.sortByKey(false).foreach(new
VoidFunction<Tuple2<SecondSortKey,String>>() {

/**
 * 
 */
private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
         System.out.println(tuple._2);
}

});

public class SecondSortKey implements Serializable,Comparable{
/**
*
*/
private static final long serialVersionUID = 1L;
private int first;
private int second;
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public SecondSortKey(int first, int second) {
super();
this.first = first;
this.second = second;
}
@Override
public int compareTo(SecondSortKey o1) {
if(getFirst() - o1.getFirst() ==0 ){
return getSecond() - o1.getSecond();
}else{
return getFirst() - o1.getFirst();
}
}
}

分組取topN和topN
SparkConf conf = new SparkConf()
.setMaster(“local”)
.setAppName(“TopOps”);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD linesRDD = sc.textFile(“scores.txt”);

JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, Integer> call(String str) throws Exception {
String[] splited = str.split("\t");
String clazzName = splited[0];
Integer score = Integer.valueOf(splited[1]);
return new Tuple2<String, Integer> (clazzName,score);
}
});

pairRDD.groupByKey().foreach(new
VoidFunction<Tuple2<String,Iterable>>() {

/**
 * 
 */
private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
String clazzName = tuple._1;
Iterator<Integer> iterator = tuple._2.iterator();

Integer[] top3 = new Integer[3];

while (iterator.hasNext()) {
     Integer score = iterator.next();

       for (int i = 0; i < top3.length; i++) {
     if(top3[i] == null){
            top3[i] = score;
            break;
      }else if(score > top3[i]){
             for (int j = 2; j > i; j--) {
            top3[j] = top3[j-1];
             }
            top3[i] = score;
            break;
     }
   }

}
System.out.println(“class Name:”+clazzName);
for(Integer sscore : top3){
System.out.println(sscore);
}
}
});