1. 程式人生 > >MapReduce案例(中)

MapReduce案例(中)

三、全排序

        設計思想:實現全排序,我們可以利用Map任務對自己輸入的任務進行排序,然後進行全域性排序,但是這種設計導致Reduce任務的個數只能是1,並行度不高。針對該問題,我們利用分割槽,進行多分割槽排序的方法。利用Hadoop自帶的TotalOrderPatitioner,為排序作業建立分割槽,在建立分割槽之前需要使用Hadoop預設的抽樣器先對其抽樣,根據資料分佈生成分割槽檔案,分割槽中資料的範圍需要通過分割槽檔案來指定。這樣就保證了每個任務處理的都是連續區間的資料。

package com.hadoop.totalsort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class TotalSort {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Path inputPath = new Path(args[0]);
		Path outputPath = new Path(args[1]);
		//分割槽檔案路徑
		Path partitionFile = new Path(args[2]);
		//reduce任務數
		int reduceNumer = Integer.parseInt(args[3]);
		/*引數說明
		 * 第一個引數:會被選中的概率
		 * 第二個引數:選取的樣本數
		 * 第三個引數:最大讀取InputSplit數
		 * 
		 * */
		RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text,Text>(0.1, 10000, 10);	
		Configuration conf = new Configuration();	
		TotalOrderPartitioner.setPartitionFile(conf, partitionFile);

		Job job = new Job(conf);
		job.setJobName("TotalSort");
		job.setJarByClass(TotalSort.class);
		//資料檔案預設以\t分割
		job.setInputFormatClass(KeyValueTextInputFormat.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setNumReduceTasks(reduceNumer);	
		job.setPartitionerClass(TotalOrderPartitioner.class);
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		//如果輸出目錄存在則刪除
		outputPath.getFileSystem(conf).delete(outputPath,true);	
		InputSampler.writePartitionFile(job, sampler);
		System.out.println(job.waitForCompletion(true)?0:1);	
	}
}

四、合併多個小檔案

        在前面的文章中我們就說過,hdfs用於儲存大的資料集,如果小檔案的數量太多,是對資源的一種浪費。那麼,我們可以使用CombineTextInputFormat.setMaxInputSplitSize設定切片的大小,將小檔案合併為大檔案。

package com.hadoop.combinefile;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SmallFileCombinerMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	NullWritable v = NullWritable.get();
	protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{

		context.write(value, v);
	}
}
package com.hadoop.combinefile;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver {

	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();	
		Job job = new Job(conf,"smallFileCombine");
		job.setJarByClass(Driver.class);
		job.setMapperClass(SmallFileCombinerMapper.class);	
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		//將小檔案劃分為一個切片
		job.setInputFormatClass(CombineTextInputFormat.class);
		//設定切片的大小,如果多個檔案總和小於150M則會被合併
		CombineTextInputFormat.setMaxInputSplitSize(job, 1024*1024*150);
		CombineTextInputFormat.setInputPaths(job, new Path("hdfs://10.128.0.130:9000/user/root/CombineInput"));
		
		FileOutputFormat.setOutputPath(job, new Path("hdfs://10.128.0.130:9000/user/root/output9"));
		job.setNumReduceTasks(0);
		System.out.println("OK");
		System.out.println(job.waitForCompletion(true) ? 0 : 1);
	}
}

五、資料去重

        資料去重的目的是緯二路去除檔案中重複的資料,在mapreduce中我們可以利用map的輸出經過shuffle過程聚整合<key,value-list>的形式。因此,我們只需要將map階段的value輸出為key,value任意;在reduce階段只將key輸出,value置為空就可以了。

package com.hadoop.DeWeighting;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DeMap extends Mapper<Object, Text, Text, Text>{
	
	public void map(Object key,Text value,Context context) throws IOException, InterruptedException {	
		context.write(value, new Text(""));		
	}
}
package com.hadoop.DeWeighting;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DeReduce extends Reducer<Text, Text, Text, Text>{
	
	public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
		context.write(key, new Text(""));
	}
}
package com.hadoop.DeWeighting;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = new Job(conf,"DeWeight");
		job.setJarByClass(Driver.class);
		job.setMapperClass(DeMap.class);	
		job.setReducerClass(DeReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path("hdfs://10.128.0.130:9000/user/root/DeWeighting"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://10.128.0.130:9000/user/root/output10"));
		System.out.println("OK");
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}