分別使用Hadoop和Spark實現二次排序
零、序(注意本部分與標題無太大關係,可直接翻到第一部分)
既然沒用為啥會有序?原因不想再開一篇文章,來抒發點什麼感想或者計劃了,就在這裡寫點好了:
前些日子買了幾本書,打算學習和研究大資料方面的知識,一直因為實習、考試、畢業設計等問題搞得沒有時間,現在進入了寒假,可以安心的學點有用的知識了。
這篇部落格裡的演算法部分的內容來自《資料演算法:Hadoop/Spark大資料處理技巧》一書,不過書中的程式碼雖然思路正確,但是程式碼不完整,並且只有java部分的程式設計,我在它的基礎上又加入scala部分,當然是在使用Spark的時候寫的scala。
廢話不多說,進入正題。
一、輸入、期望輸出、思路。
輸入為SecondarySort.txt,內容為:
2000,12,04,10
2000,11,01,20
2000,12,02,-20
2000,11,07,30
2000,11,24,-40
2012,12,21,30
2012,12,22,-20
2012,12,23,60
2012,12,24,70
2012,12,25,10
2013,01,23,90
2013,01,24,70
2013,01,20,-10
意義為:
年,月,日,溫度
期望輸出:
2013-01 90,70,-10
2012-12 70,60,30,10,-20
2000-12 10,-20
2000-11 30,20,-40
意義為:
年-月 溫度1,溫度2,溫度3,……
年-月從上之下降序排列,
溫度從左到右降序排列
思路:
拋棄不需要的代表日的哪一行資料
將年月作為組合鍵(key),比較大小,降序排列
將對應年月(key)的溫度的值(value)進行降序排列和拼接
二、使用Java編寫MapReduce程式實現二次排序
程式碼要實現的類有:
除了常見的SecondarySortingMapper,SecondarySortingReducer,和SecondarySortDriver以外
這裡還多出了兩個個外掛類(DateTemperatureGroupingComparator和DateTemperaturePartioner)和一個自定義型別(DateTemperaturePair)
以下是實現的程式碼(注意以下每個檔案的程式碼段我去掉了包名,所以要使用的話自己加上吧):
SecondarySortDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySortDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration configuration = getConf();
Job job = Job.getInstance(configuration, "SecondarySort");
job.setJarByClass(SecondarySortDriver.class);
job.setJobName("SecondarySort");
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 設定map輸出key value格式
job.setMapOutputKeyClass(DateTemperaturePair.class);
job.setMapOutputValueClass(IntWritable.class);
// 設定reduce輸出key value格式
job.setOutputKeyClass(DateTemperaturePair.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(SecondarySortingMapper.class);
job.setReducerClass(SecondarySortingReducer.class);
job.setPartitionerClass(DateTemperaturePartitioner.class);
job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);
boolean status = job.waitForCompletion(true);
return status ? 0 : 1;
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException(
"!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: SecondarySortDriver"
+ "<input-path> <output-path>");
}
int returnStatus = ToolRunner.run(new SecondarySortDriver(), args);
System.exit(returnStatus);
}
}
DateTemperaturePair.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class DateTemperaturePair implements Writable,
WritableComparable<DateTemperaturePair> {
private String yearMonth;
private String day;
protected Integer temperature;
public int compareTo(DateTemperaturePair o) {
int compareValue = this.yearMonth.compareTo(o.getYearMonth());
if (compareValue == 0) {
compareValue = temperature.compareTo(o.getTemperature());
}
return -1 * compareValue;
}
public void write(DataOutput dataOutput) throws IOException {
Text.writeString(dataOutput, yearMonth);
dataOutput.writeInt(temperature);
}
public void readFields(DataInput dataInput) throws IOException {
this.yearMonth = Text.readString(dataInput);
this.temperature = dataInput.readInt();
}
@Override
public String toString() {
return yearMonth.toString();
}
public String getYearMonth() {
return yearMonth;
}
public void setYearMonth(String text) {
this.yearMonth = text;
}
public String getDay() {
return day;
}
public void setDay(String day) {
this.day = day;
}
public Integer getTemperature() {
return temperature;
}
public void setTemperature(Integer temperature) {
this.temperature = temperature;
}
}
SecondarySortingMapper.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SecondarySortingMapper extends
Mapper<LongWritable, Text, DateTemperaturePair, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
// YYYY = tokens[0]
// MM = tokens[1]
// DD = tokens[2]
// temperature = tokens[3]
String yearMonth = tokens[0] + "-" + tokens[1];
String day = tokens[2];
int temperature = Integer.parseInt(tokens[3]);
DateTemperaturePair reduceKey = new DateTemperaturePair();
reduceKey.setYearMonth(yearMonth);
reduceKey.setDay(day);
reduceKey.setTemperature(temperature);
context.write(reduceKey, new IntWritable(temperature));
}
}
DateTemperaturePartioner.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class DateTemperaturePartitioner extends
Partitioner<DateTemperaturePair, Text> {
@Override
public int getPartition(DateTemperaturePair dataTemperaturePair, Text text,
int i) {
return Math.abs(dataTemperaturePair.getYearMonth().hashCode() % i);
}
}
DateTemperatureGroupingComparator.java
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class DateTemperatureGroupingComparator extends WritableComparator {
public DateTemperatureGroupingComparator() {
super(DateTemperaturePair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
DateTemperaturePair pair1 = (DateTemperaturePair) a;
DateTemperaturePair pair2 = (DateTemperaturePair) b;
return pair1.getYearMonth().compareTo(pair2.getYearMonth());
}
}
SecondarySortingReducer.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SecondarySortingReducer extends
Reducer<DateTemperaturePair, IntWritable, DateTemperaturePair, Text> {
@Override
protected void reduce(DateTemperaturePair key,
Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
StringBuilder sortedTemperatureList = new StringBuilder();
for (IntWritable temperature : values) {
sortedTemperatureList.append(temperature);
sortedTemperatureList.append(",");
}
sortedTemperatureList.deleteCharAt(sortedTemperatureList.length()-1);
context.write(key, new Text(sortedTemperatureList.toString()));
}
}
三、使用scala編寫Spark程式實現二次排序
這個程式碼想必就比較簡潔了。如下:
SecondarySort.scala
package spark
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object SecondarySort {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(" Secondary Sort ")
.setMaster("local")
var sc = new SparkContext(conf)
sc.setLogLevel("Warn")
//val file = sc.textFile("hdfs://localhost:9000/Spark/SecondarySort/Input/SecondarySort2.txt")
val file = sc.textFile("e:\\SecondarySort.txt")
val rdd = file.map(line => line.split(","))
.map(x=>((x(0),x(1)),x(3))).groupByKey().sortByKey(false)
.map(x => (x._1._1+"-"+x._1._2,x._2.toList.sortWith(_>_)))
rdd.foreach(
x=>{
val buf = new StringBuilder()
for(a <- x._2){
buf.append(a)
buf.append(",")
}
buf.deleteCharAt(buf.length()-1)
println(x._1+" "+buf.toString())
})
sc.stop()
}
}