1. 程式人生 > >Spark:Java實現 二次排序

Spark:Java實現 二次排序

測試資料

1 5
2 4
3 6
1 3
2 1

輸出結果

1 3
1 5
2 1
2 4
3 6 

實現思路:

1.實現自定義的key,要實現Ordered介面和Serializable介面,在key中實現自己對多個列的排序演算法
2.將包含文字的RDD,對映成key為自定義key,value為文字的JavaPariRDD
3.使用sortByKey運算元按照自定義的key進行排序
4.再次對映,剔除自定義的key,而只保留文字行

自定義的key:SecondarySortKey_12

package cn.spark.study.core;

import java.io.Serializable;

import scala.math.Ordered;

/**
 * 自定義的二次排序key
 * @author Administrator
 *
 */
public class SecondarySortKey_12 implements Ordered<SecondarySortKey_12>,Serializable{

    private static final long serialVersionUID = 1L;

//首先在自定義的key裡面,定義需要進行排序的列
private int first;
private int second;

public SecondarySortKey_12(int first, int second) {
    this.first = first;
    this.second = second;
}

@Override
public boolean $greater(SecondarySortKey_12 other) {
    if(this.first > other.getFirst()){
        return true;
    }
    else if (this.first == other.getFirst() && this.second>other.getSecond()){
        return true;
    }
    return false;
}

@Override
public boolean $greater$eq(SecondarySortKey_12 other) {
    if(this.$greater(other)){
        return true;
    }
    else if (this.first == other.getFirst() && this.second == other.getSecond()){
        return true;
    }
    return false;
}

@Override
public boolean $less(SecondarySortKey_12 other) {
    if(this.first<other.getFirst()){
        return true;
    }
    else if (this.first == other.getFirst() && this.second<other.getSecond()){
        return true;
    }
    return false;
}

@Override
public boolean $less$eq(SecondarySortKey_12 other) {
    if(this.$less(other)){
        return true;
    }
    else if(this.first == other.getFirst() && this.second == other.getSecond()) {
        return true;
    }
    return false;
}

@Override
public int compare(SecondarySortKey_12 other) {
    if (this.first - other.getFirst() != 0){
         return this.first - other.getFirst();
    }
    else {
         return this.second - other.getSecond();
    }
}

@Override
public int compareTo(SecondarySortKey_12 other) {
    if (this.first - other.getFirst() != 0){
         return this.first - other.getFirst();
    }
    else {
         return this.second - other.getSecond();
    }
}



//為要進行排序的多個列,提供getter和setter方法,以及hascode 和equals方法
public int getFirst() {
    return first;
}

public void setFirst(int first) {
    this.first = first;
}

public int getSecond() {
    return second;
}

public void setSecond(int second) {
    this.second = second;
}

@Override
public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + first;
    result = prime * result + second;
    return result;
}

@Override
public boolean equals(Object obj) {
    if (this == obj)
        return true;
    if (obj == null)
        return false;
    if (getClass() != obj.getClass())
        return false;
    SecondarySortKey_12 other = (SecondarySortKey_12) obj;
    if (first != other.first)
        return false;
    if (second != other.second)
        return false;
    return true;
    }
}

* SecondarySort_12 類:*

package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class SecondarySort_12 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SecondarySort").setMaster("local");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> lines = sc.textFile("E://BigData//sparkdata//sort.txt");

    JavaPairRDD<SecondarySortKey_12, String> pairs = lines.mapToPair(
            new PairFunction<String, SecondarySortKey_12, String>() {

        private static final long serialVersionUID = 1L;

        @Override
        public Tuple2<SecondarySortKey_12, String> call(String line) throws Exception {
            String[] lineSplited = line.split(" ");
            SecondarySortKey_12 key = new SecondarySortKey_12(
                    Integer.valueOf(lineSplited[0]),
                    Integer.valueOf(lineSplited[1]));

            return new Tuple2<SecondarySortKey_12, String>(key, line);
        }
    });

    JavaPairRDD<SecondarySortKey_12, String> sortedPairs = pairs.sortByKey();

    JavaRDD<String> sortedLines = sortedPairs.map(
            new Function<Tuple2<SecondarySortKey_12,String>, String>() {

        private static final long serialVersionUID = 1L;

        @Override
        public String call(Tuple2<SecondarySortKey_12, String> v) throws Exception {
            return v._2;
        }
    });

    sortedLines.foreach(new VoidFunction<String>() {

        private static final long serialVersionUID = 1L;

        @Override
        public void call(String t) throws Exception {
            System.out.println(t);
        }
    });

    sc.close();

}
}