Spark:Java實現 二次排序
阿新 • • 發佈:2019-02-10
測試資料
1 5
2 4
3 6
1 3
2 1
輸出結果
1 3
1 5
2 1
2 4
3 6
實現思路:
1.實現自定義的key,要實現Ordered介面和Serializable介面,在key中實現自己對多個列的排序演算法
2.將包含文字的RDD,對映成key為自定義key,value為文字的JavaPariRDD
3.使用sortByKey運算元按照自定義的key進行排序
4.再次對映,剔除自定義的key,而只保留文字行
自定義的key:SecondarySortKey_12
package cn.spark.study.core; import java.io.Serializable; import scala.math.Ordered; /** * 自定義的二次排序key * @author Administrator * */ public class SecondarySortKey_12 implements Ordered<SecondarySortKey_12>,Serializable{ private static final long serialVersionUID = 1L; //首先在自定義的key裡面,定義需要進行排序的列 private int first; private int second; public SecondarySortKey_12(int first, int second) { this.first = first; this.second = second; } @Override public boolean $greater(SecondarySortKey_12 other) { if(this.first > other.getFirst()){ return true; } else if (this.first == other.getFirst() && this.second>other.getSecond()){ return true; } return false; } @Override public boolean $greater$eq(SecondarySortKey_12 other) { if(this.$greater(other)){ return true; } else if (this.first == other.getFirst() && this.second == other.getSecond()){ return true; } return false; } @Override public boolean $less(SecondarySortKey_12 other) { if(this.first<other.getFirst()){ return true; } else if (this.first == other.getFirst() && this.second<other.getSecond()){ return true; } return false; } @Override public boolean $less$eq(SecondarySortKey_12 other) { if(this.$less(other)){ return true; } else if(this.first == other.getFirst() && this.second == other.getSecond()) { return true; } return false; } @Override public int compare(SecondarySortKey_12 other) { if (this.first - other.getFirst() != 0){ return this.first - other.getFirst(); } else { return this.second - other.getSecond(); } } @Override public int compareTo(SecondarySortKey_12 other) { if (this.first - other.getFirst() != 0){ return this.first - other.getFirst(); } else { return this.second - other.getSecond(); } } //為要進行排序的多個列,提供getter和setter方法,以及hascode 和equals方法 public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + first; result = prime * result + second; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SecondarySortKey_12 other = (SecondarySortKey_12) obj; if (first != other.first) return false; if (second != other.second) return false; return true; } }
* SecondarySort_12 類:*
package cn.spark.study.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class SecondarySort_12 { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SecondarySort").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("E://BigData//sparkdata//sort.txt"); JavaPairRDD<SecondarySortKey_12, String> pairs = lines.mapToPair( new PairFunction<String, SecondarySortKey_12, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<SecondarySortKey_12, String> call(String line) throws Exception { String[] lineSplited = line.split(" "); SecondarySortKey_12 key = new SecondarySortKey_12( Integer.valueOf(lineSplited[0]), Integer.valueOf(lineSplited[1])); return new Tuple2<SecondarySortKey_12, String>(key, line); } }); JavaPairRDD<SecondarySortKey_12, String> sortedPairs = pairs.sortByKey(); JavaRDD<String> sortedLines = sortedPairs.map( new Function<Tuple2<SecondarySortKey_12,String>, String>() { private static final long serialVersionUID = 1L; @Override public String call(Tuple2<SecondarySortKey_12, String> v) throws Exception { return v._2; } }); sortedLines.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; @Override public void call(String t) throws Exception { System.out.println(t); } }); sc.close(); } }