Spark 使用sortByKey進行二次排序
阿新 • • 發佈:2019-02-01
Spark的sortByKey API允許自定義排序規則,這樣就可以進行自定義的二次排序、三次排序等等。
先來看一下sortByKey的原始碼實現:
def sortByKey(): JavaPairRDD[K, V] = sortByKey(true)
def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
sortByKey(comp, ascending)
}
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
}
class OrderedRDDFunctions[K : Ordering : ClassTag,
V: ClassTag,
P <: Product2[K, V] : ClassTag] @DeveloperApi () (
self: RDD[P])
extends Logging with Serializable
通過程式碼我們可以發現要實現自定義的二次排序,則Key必須實現Spark 的Ordered特質和Java的Serializable介面。
Java實現:
首先是Key類的自定義實現:
import scala.math.Ordered;
import java.io.Serializable;
/**
* Key的自定義
* Created by Administrator on 2016/8/14 0014.
*/
public class SecondarySortKey implements Ordered<SecondarySort>, Serializable {
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}
public void setFirst(int first) {
this.first = first;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SecondarySort that = (SecondarySort) o;
if (first != that.first) return false;
return second == that.second;
}
@Override
public int hashCode() {
int result = first;
result = 31 * result + second;
return result;
}
// 需要排序的key
private int first;
private int second;
// 二次排序的公開構造器
public SecondarySortKey(int first, int second) {
this.first = first;
this.second = second;
}
@Override
public int compare(SecondarySort other) {
if (this.$greater(other)) {
return 1;
}
else if (this.$less(other)) {
return -1;
}
return 0;
}
@Override
public boolean $less(SecondarySort other) {
if (this.first < other.first) {
return true;
}
else if (this.first == other.first && this.second < other.second) {
return true;
}
return false;
}
@Override
public boolean $greater(SecondarySort other) {
if (this.first > other.first) {
return true;
}
else if (this.first == other.first && this.second > other.first) {
return true;
}
return false;
}
@Override
public boolean $less$eq(SecondarySort other) {
if (this.$less(other)) {
return true;
}
else if (this.first == other.first && this.second == other.second) {
return true;
}
return false;
}
@Override
public boolean $greater$eq(SecondarySort other) {
if (this.$greater(other)) {
return true;
}
else if (this.first == other.first && this.second == other.second) {
return true;
}
return false;
}
@Override
public int compareTo(SecondarySort other) {
if (this.$greater(other)) {
return 1;
}
else if (this.$less(other)) {
return -1;
}
return 0;
}
}
二次排序:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
* Created by Administrator on 2016/8/14 0014.
*/
public class SecondarySortApp {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("/home/resources/helloSpark.txt");
JavaPairRDD<SecondarySort, String> pairs = lines.mapToPair(new PairFunction<String, SecondarySort, String>() {
@Override
public Tuple2<SecondarySort, String> call(String line) throws Exception {
String[] splited = line.split(" ");
SecondarySort key = new SecondarySort(Integer.valueOf(splited[0]), Integer.valueOf(splited[1]));
return new Tuple2<SecondarySort, String>(key, line);
}
});
JavaPairRDD<SecondarySort, String> sorted = pairs.sortByKey(); // 完成二次排序
JavaRDD<String> result = sorted.map(new Function<Tuple2<SecondarySort,String>, String>() {
@Override
public String call(Tuple2<SecondarySort, String> v1) throws Exception {
return v1._2;
}
});
for (String s : result.collect()) {
System.out.println(s);
}
sc.stop();
}
}
Scala 版本實現
Key:
package com.spark.App
/**
* Created by Administrator on 2016/8/14 0014.
*/
class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {
override def compare(other: SecondarySortKey): Int = {
if (this.first > other.first || (this.first == other.first && this.second > other.second)) {
return 1;
}
else if (this.first < other.first || (this.first == other.first && this.second < other.second)) {
return -1;
}
return 0;
}
}
二次排序:
package com.spark.App
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by Administrator on 2016/8/14 0014.
*/
object SecondarySortApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("/home/resources/helloSpark.txt")
val pairRDD = lines.map(line => {
val splited = line.split(" ")
val key = new SecondarySortKey(splited(0).toInt, splited(1).toInt)
(key, line)
})
val sorted = pairRDD.sortByKey(false)
val result = sorted.map(item => item._2)
result.collect().foreach(println)
}
}