MapReduce案例(中)
阿新 • • 發佈:2019-01-07
三、全排序
設計思想:實現全排序,我們可以利用Map任務對自己輸入的任務進行排序,然後進行全域性排序,但是這種設計導致Reduce任務的個數只能是1,並行度不高。針對該問題,我們利用分割槽,進行多分割槽排序的方法。利用Hadoop自帶的TotalOrderPatitioner,為排序作業建立分割槽,在建立分割槽之前需要使用Hadoop預設的抽樣器先對其抽樣,根據資料分佈生成分割槽檔案,分割槽中資料的範圍需要通過分割槽檔案來指定。這樣就保證了每個任務處理的都是連續區間的資料。
package com.hadoop.totalsort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.InputSampler; import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; public class TotalSort { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); //分割槽檔案路徑 Path partitionFile = new Path(args[2]); //reduce任務數 int reduceNumer = Integer.parseInt(args[3]); /*引數說明 * 第一個引數:會被選中的概率 * 第二個引數:選取的樣本數 * 第三個引數:最大讀取InputSplit數 * * */ RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text,Text>(0.1, 10000, 10); Configuration conf = new Configuration(); TotalOrderPartitioner.setPartitionFile(conf, partitionFile); Job job = new Job(conf); job.setJobName("TotalSort"); job.setJarByClass(TotalSort.class); //資料檔案預設以\t分割 job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(reduceNumer); job.setPartitionerClass(TotalOrderPartitioner.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); //如果輸出目錄存在則刪除 outputPath.getFileSystem(conf).delete(outputPath,true); InputSampler.writePartitionFile(job, sampler); System.out.println(job.waitForCompletion(true)?0:1); } }
四、合併多個小檔案
在前面的文章中我們就說過,hdfs用於儲存大的資料集,如果小檔案的數量太多,是對資源的一種浪費。那麼,我們可以使用CombineTextInputFormat.setMaxInputSplitSize設定切片的大小,將小檔案合併為大檔案。
package com.hadoop.combinefile; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SmallFileCombinerMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ NullWritable v = NullWritable.get(); protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ context.write(value, v); } }
package com.hadoop.combinefile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Driver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf,"smallFileCombine"); job.setJarByClass(Driver.class); job.setMapperClass(SmallFileCombinerMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //將小檔案劃分為一個切片 job.setInputFormatClass(CombineTextInputFormat.class); //設定切片的大小,如果多個檔案總和小於150M則會被合併 CombineTextInputFormat.setMaxInputSplitSize(job, 1024*1024*150); CombineTextInputFormat.setInputPaths(job, new Path("hdfs://10.128.0.130:9000/user/root/CombineInput")); FileOutputFormat.setOutputPath(job, new Path("hdfs://10.128.0.130:9000/user/root/output9")); job.setNumReduceTasks(0); System.out.println("OK"); System.out.println(job.waitForCompletion(true) ? 0 : 1); } }
五、資料去重
資料去重的目的是緯二路去除檔案中重複的資料,在mapreduce中我們可以利用map的輸出經過shuffle過程聚整合<key,value-list>的形式。因此,我們只需要將map階段的value輸出為key,value任意;在reduce階段只將key輸出,value置為空就可以了。
package com.hadoop.DeWeighting;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DeMap extends Mapper<Object, Text, Text, Text>{
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
context.write(value, new Text(""));
}
}
package com.hadoop.DeWeighting;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DeReduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
context.write(key, new Text(""));
}
}
package com.hadoop.DeWeighting;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf,"DeWeight");
job.setJarByClass(Driver.class);
job.setMapperClass(DeMap.class);
job.setReducerClass(DeReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://10.128.0.130:9000/user/root/DeWeighting"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://10.128.0.130:9000/user/root/output10"));
System.out.println("OK");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}