Spark:高階排序(二次排序)
阿新 • • 發佈:2018-12-14
為了多維的排序,需要考慮多個條件,這要求我們自定義key
1 23
3 22
3 31
1 12
2 11
4 45
二、使用java實現
2.1、自定義key
使用scala.math.Ordered介面,實現Serializable介面
package com.chb.sparkDemo.secondarySort; import java.io.Serializable; import scala.math.Ordered; /** * Spark 二次排序自定義key * 使用scala.math.Ordered介面 * @author 12285 */ public class MyKey implements Ordered<MyKey>, Serializable{ private int firstKey; private int secondKey; public MyKey(int firstKey, int secondKey) { super(); this.firstKey = firstKey; this.secondKey = secondKey; } public int getFirstKey() { return firstKey; } public int getSecondKey() { return secondKey; } public void setFirstKey(int firstKey) { this.firstKey = firstKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } public boolean $greater(MyKey other) { if (this.getFirstKey() > other.getFirstKey()) { return true; }else if(this.getFirstKey() == other.getFirstKey() && this.getSecondKey() > other.getSecondKey()){ return true; }else { return false; } } public boolean $greater$eq(MyKey other) { if ($greater(other) || this.getFirstKey()==other.getFirstKey() && this.getSecondKey() == other.getSecondKey()) { return true; } return false; } public boolean $less(MyKey other) { if (this.getFirstKey() < other.getFirstKey()) { return true; }else if(this.getFirstKey() == other.getFirstKey() && this.getSecondKey() < other.getSecondKey()){ return true; }else { return false; } } public boolean $less$eq(MyKey other) { if ($less(other) || this.getFirstKey()==other.getFirstKey() && this.getSecondKey() == other.getSecondKey()) { return true; } return false; } public int compare(MyKey other) { if (this.getFirstKey() != other.getFirstKey()) { return this.getFirstKey()-other.getFirstKey(); }else { return this.getSecondKey() - other.getSecondKey(); } } public int compareTo(MyKey other) { if (this.getFirstKey() != other.getFirstKey()) { return this.getFirstKey()-other.getFirstKey(); }else { return this.getSecondKey() - other.getSecondKey(); } } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + firstKey; result = prime * result + secondKey; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; MyKey other = (MyKey) obj; if (firstKey != other.firstKey) return false; if (secondKey != other.secondKey) return false; return true; } }
2.2、具體實現步驟
第一步: 自定義key 實現scala.math.Ordered介面,和Serializeable介面 第二步:將要進行二次排序的資料載入,按照<key,value>格式的RDD 第三步:使用sortByKey 基於自定義的key進行二次排序 第四步:去掉排序的key,只保留排序的結果
2.2.1、 第一步: 自定義key 實現scala.math.Ordered介面,和Serializeable介面
JavaPairRDD<MyKey, String> mykeyPairs = lines.mapToPair(new PairFunction<String, MyKey, String>() { private static final long serialVersionUID = 1L; public Tuple2<MyKey, String> call(String line) throws Exception { int firstKey = Integer.valueOf(line.split(" ")[0]); int secondKey = Integer.valueOf(line.split(" ")[1]); MyKey mykey = new MyKey(firstKey, secondKey); return new Tuple2<MyKey, String>(mykey, line); } });
2.2.2、第三步:使用sortByKey 基於自定義的key進行二次排序
JavaPairRDD<MyKey, String> sortPairs = mykeyPairs.sortByKey();
2.2.3、第四步:去掉排序的key,只保留排序的結果
JavaRDD<String> result = sortPairs.map(new Function<Tuple2<MyKey,String>, String>() { private static final long serialVersionUID = 1L; public String call(Tuple2<MyKey, String> tuple) throws Exception { return tuple._2;//line } }); //列印排序好的結果 result.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; public void call(String line) throws Exception { System.out.println(line); } });
三、完整程式碼
package com.chb.sparkDemo.secondarySort;
import io.netty.handler.codec.http.HttpContentEncoder.Result;
import java.awt.image.RescaleOp;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* Spark二次排序的具體實現步驟:
* 第一步: 自定義key 實現scala.math.Ordered介面,和Serializeable介面
* 第二步:將要進行二次排序的資料載入,按照<key,value>格式的RDD
* 第三步:使用sortByKey 基於自定義的key進行二次排序
* 第四步:去掉排序的key,只保留排序的結果
* @author 12285
*
*/
public class SecordSortTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount");
//內部實際呼叫的SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
//讀取檔案,將每行資料轉換為
JavaRDD<String> lines = jsc.textFile("C:\\Users\\12285\\Desktop\\test");//hadoopRDD
//第二步:將要進行二次排序的資料載入,按照<key,value>格式的RDD
JavaPairRDD<MyKey, String> mykeyPairs = lines.mapToPair(new PairFunction<String, MyKey, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<MyKey, String> call(String line) throws Exception {
int firstKey = Integer.valueOf(line.split(" ")[0]);
int secondKey = Integer.valueOf(line.split(" ")[1]);
MyKey mykey = new MyKey(firstKey, secondKey);
return new Tuple2<MyKey, String>(mykey, line);
}
});
//第三步:使用sortByKey 基於自定義的key進行二次排序
JavaPairRDD<MyKey, String> sortPairs = mykeyPairs.sortByKey();
//第四步:去掉排序的key,只保留排序的結果
JavaRDD<String> result = sortPairs.map(new Function<Tuple2<MyKey,String>, String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<MyKey, String> tuple) throws Exception {
return tuple._2;//line
}
});
//列印排序好的結果
result.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
public void call(String line) throws Exception {
System.out.println(line);
}
});
}
}
結果:
1 12
1 23
2 11
3 22
3 31
4 45
四、使用scala實現
4.1、自定義key
class SecordSortKey(val firstKey: Int, val secondKey: Int)extends Ordered[SecordSortKey] with Serializable{
override def compare(that: SecordSortKey):Int = {
if(this.firstKey != that.firstKey) {
this.firstKey - that.firstKey
}else {
this.secondKey - that.secondKey
}
}
}
4.2、具體實現
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object SecordSortTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("SecordSort")
val sc = new SparkContext(conf);
val lines = sc.textFile("C:\\Users\\12285\\Desktop\\test");
//第二步:將要進行二次排序的資料載入,按照<key,value>格式的RDD
val pairSortKey = lines.map { line => (
new SecordSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),
line
) };
//第三步:使用sortByKey 基於自定義的key進行二次排序
val sortPair = pairSortKey.sortByKey(false);
val sortResult = sortPair.map(line=>line._2);
sortResult.collect().foreach { x => print(x) };
}
}