MapReduce案例9——多個數字檔案的資料排序並新增序號(新增可並行方法)
阿新 • • 發佈:2019-01-01
題目:
數字排序並加序號源資料:
2
32
654
32
15
756
65223
5956
22
650
92
26
54
6
最張結果:
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
一定要考慮 當資料量一大的時候, 你的實現思路能否使用。
解題思路:當有多個無序檔案需要進行排序,並且在數字前面加入索引,首先考慮使用MapReduce的預設排序方法,在map裡面進行排序,然後設定全域性計數變數記錄索引值,通過設定全域性臨時變數記錄上個值的大小,如果當前值大於臨時值,計數變數加1,否則不變,然後進行輸出
程式碼如下:
/** * @author: lpj * @date: 2018年3月16日 下午7:16:47 * @Description: */ package lpj.reduceWork; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * */ import org.apache.hadoop.security.token.Token.PrivateToken; public class BigNumFileSortMR { private static int countnum = 0; private static int temNum = 0; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // conf.addResource("hdfs-site.xml");//使用配置檔案 // System.setProperty("HADOOP_USER_NAME", "hadoop");//使用叢集 FileSystem fs = FileSystem.get(conf);//預設使用本地 Job job = Job.getInstance(conf); job.setJarByClass(BigNumFileSortMR.class); job.setMapperClass(BigNumFileSortMR_Mapper.class); job.setReducerClass(BigNumFileSortMR_Reducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Path inputPath = new Path("d:/a/homework9/input/");//讀入多個檔案 Path outputPath = new Path("d:/a/homework9/output/");//輸出一個檔案 if (fs.exists(inputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class BigNumFileSortMR_Mapper extends Mapper<LongWritable, Text, IntWritable, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { int num = Integer.parseInt(value.toString()); context.write(new IntWritable(num), NullWritable.get()); } } public static class BigNumFileSortMR_Reducer extends Reducer<IntWritable, NullWritable, Text, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException { for(NullWritable niv : values){ if (key.get() > temNum) { countnum++;//全域性排序變數 temNum = key.get();//記錄當前臨時值 } String kk = countnum + "\t" + key.toString(); kout.set(kk); context.write(kout, NullWritable.get()); } } } }
測試:在輸入檔案中,將給出的數值複製3份,作為輸入檔案,執行後的結果為:
1 2
1 2
1 2
2 6
2 6
2 6
3 15
3 15
3 15
4 22
4 22
4 22
5 26
5 26
5 26
6 32
6 32
6 32
6 32
6 32
6 32
7 54
7 54
7 54
8 92
8 92
8 92
9 650
9 650
9 650
10 654
10 654
10 654
11 756
11 756
11 756
12 5956
12 5956
12 5956
13 65223
13 65223
13 65223
當輸入檔案為大檔案,多個檔案時,使用一個reduce任務輸出壓力過大,因此採用多reduce方法:
程式碼如下:
/**
* @author: lpj
* @date: 2018年3月16日 下午7:16:47
* @Description:
*/
package lpj.reduceWork;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.record.Index;
/**
*
*/
import org.apache.hadoop.security.token.Token.PrivateToken;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import lpj.reduceWorkbean.MyPatitionerBigFileSum;
public class BigNumFileSortMR2 extends Configured implements Tool{
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new BigNumFileSortMR2(), args);
System.exit(run);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
//---------------將檔案分組輸出-------------------------
FileSystem fs = FileSystem.get(conf);//預設使用本地
Job job = Job.getInstance(conf);
job.setJarByClass(BigNumFileSortMR2.class);
job.setMapperClass(BigNumFileSortMR_Mapper.class);
job.setReducerClass(BigNumFileSortMR_Reducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(MyPatitionerBigFileSum.class);
job.setNumReduceTasks(4);
Path inputPath = new Path("/a/homework9/input");//讀入多個檔案
Path outputPath = new Path("/a/homework9/output1");//輸出多個檔案
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
//-----------------統計每個檔案的個數,記錄檔名----------------------------
FileSystem fs2 = FileSystem.get(conf);//預設使用本地
Job job2 = Job.getInstance(conf);
job2.setJarByClass(BigNumFileSortMR2.class);
job2.setMapperClass(BigNumFileSortMR2_Mapper.class);
job2.setReducerClass(BigNumFileSortMR2_Reducer.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(IntWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
Path inputPath2 = new Path("/a/homework9/output1");//讀入多個檔案
Path outputPath2 = new Path("/a/homework9/output2");//輸出多個檔案
if (fs2.exists(outputPath2)) {
fs2.delete(outputPath2, true);
}
FileInputFormat.setInputPaths(job2, inputPath2);
FileOutputFormat.setOutputPath(job2, outputPath2);
//-----------------新增索引----------------------------
FileSystem fs3 = FileSystem.get(conf);//預設使用本地
Job job3 = Job.getInstance(conf);
job3.setJarByClass(BigNumFileSortMR2.class);
job3.setMapperClass(BigNumFileSortMR3_Mapper.class);
job3.setNumReduceTasks(0);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(NullWritable.class);
URI uri = new URI("/a/homework9/output2/part-r-00000");
job3.addCacheFile(uri);
Path inputPath3 = new Path("/a/homework9/output1");//讀入多個檔案
Path outputPath3 = new Path("/a/homework9/output3");//輸出多個檔案
if (fs3.exists(outputPath3)) {
fs3.delete(outputPath3, true);
}
FileInputFormat.setInputPaths(job3, inputPath3);
FileOutputFormat.setOutputPath(job3, outputPath3);
//--------------------------------------
ControlledJob aJob = new ControlledJob(job.getConfiguration());
ControlledJob bJob = new ControlledJob(job2.getConfiguration());
ControlledJob cJob = new ControlledJob(job3.getConfiguration());
aJob.setJob(job);
bJob.setJob(job2);
cJob.setJob(job3);
JobControl jc = new JobControl("jc");
jc.addJob(aJob);
jc.addJob(bJob);
jc.addJob(cJob);
bJob.addDependingJob(aJob);
cJob.addDependingJob(bJob);
Thread thread = new Thread(jc);
thread.start();
while(!jc.allFinished()){
thread.sleep(1000);
}
jc.stop();
return 0;
}
//------------------將檔案分組輸出-------------------------
public static class BigNumFileSortMR_Mapper extends Mapper<LongWritable, Text, IntWritable, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
int num = Integer.parseInt(value.toString());
context.write(new IntWritable(num), NullWritable.get());
}
}
public static class BigNumFileSortMR_Reducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
for(NullWritable niv : values){
context.write(key, NullWritable.get());
}
Configuration configuration = context.getConfiguration();
FileSystem fs = FileSystem.get(configuration);
}
}
//--------------------------------統計每個檔案的個數,記錄檔名(根據分割槽資訊)------------------------------------------
public static class BigNumFileSortMR2_Mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text kout = new Text();
Text valueout = new Text();
Set<Integer> numset = new HashSet<>();//記錄不重複元素個數
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
int num = Integer.parseInt(value.toString());//記錄資料大小
InputSplit inputSplit = context.getInputSplit();
FileSplit fSplit = (FileSplit)inputSplit;
System.out.println(inputSplit.getLength());
String filename = fSplit.getPath().getName();//獲取當前檔名
int count = 1;//記錄檔案元素
if (num >=0 && num < 100) {
numset.add(num);
}else if (num >= 100 && num < 500) {
numset.add(num);
}else if (num >= 500 && num < 1000) {
numset.add(num);
}else {
numset.add(num);
}
if (inputSplit.getLength() == 0) {
context.write(new Text(filename), new IntWritable(0));
}else{
context.write(new Text(filename), new IntWritable(numset.size()));
}
}
}
public static class BigNumFileSortMR2_Reducer extends Reducer<Text, IntWritable, Text, Text>{
Text kout = new Text();
Text valueout = new Text();
int index = 1;
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
//每組取第一個,作為不重複輸入
int count = 0;
for(IntWritable niv : values){
count++;
if (count <= 1) {
context.write(key,new Text(niv.get() + "\t" + index));
index += niv.get();
}else {
return;
}
}
}
}
//載入檔案資訊到記憶體,然後進行序號新增
//-------------------新增序號------------------------
public static class BigNumFileSortMR3_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
Map<String, Integer> filecount = new HashMap<>();
int firstnum = 0;
int index = 0;
int count = 0;
@SuppressWarnings("deprecation")
@Override
protected void setup(Context context)throws IOException, InterruptedException {
Path[] paths = context.getLocalCacheFiles();
String str = paths[0].toUri().toString();
BufferedReader bf = new BufferedReader(new FileReader(new File(str)));
String readline = null;
while((readline = bf.readLine()) != null){
String[] split = readline.split("\t");
filecount.put(split[0], Integer.parseInt(split[2]));
}
IOUtils.closeStream(bf);
// filecount.put("part-r-00000", 1);
// filecount.put("part-r-00002", 9);
// filecount.put("part-r-00003", 12);
//取出當前檔案的起始索引
InputSplit inputSplit = context.getInputSplit();
FileSplit fileSplit = (FileSplit)inputSplit;
String name = fileSplit.getPath().getName();
if (fileSplit.getLength() > 0) {
index = filecount.get(name);
}
}
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
int num = Integer.parseInt(value.toString());
count++;
if (count == 1) {
firstnum = num;
String kk = index + "\t" + num;
kout.set(kk);
context.write(kout, NullWritable.get());
}else {
if (num == firstnum) {
String kk = index + "\t" + num;
kout.set(kk);
context.write(kout, NullWritable.get());
}else {
index ++;
String kk = index + "\t" + num;
kout.set(kk);
context.write(kout, NullWritable.get());
}
firstnum = num;
}
}
}
public static class BigNumFileSortMR3_Reducer extends Reducer<Text, IntWritable, Text, Text>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
}
}
}
總結:此類問題無需一個MapReduce就完成任務,將問題拆分後,就會變得簡單明瞭。對於全域性排序,當檔案資料較少,檔案較小時,選用第一種方法
對於大資料處理來說,第二種更為通用
並行難點在於將前面排序的好的檔案的元素個數,以及索引起始位置記錄,然後進行排序