1. 程式人生 > >MapReduce的Map side join

MapReduce的Map side join

package com.inspur.mapreduce.join;

/*************************************
 * @author:	caolch
 * @date:	2013-12-31
 * @note:	利用mapper寫的表連線,小表讀到記憶體裡
 *************************************/

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapJoin extends Configured implements Tool {

	public static class myMapper extends Mapper<Object, Text, Text, Text> {
		// TODO Auto-generated constructor stub
		private HashMap<String,String> authorMap = new HashMap<String,String>();

		@Override
		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String []tokens = value.toString().split(":::");
			String joinData = authorMap.get(tokens[1]);
			
			if (joinData!=null) {
				context.write(new Text(tokens[0]),new Text(joinData));		
			}
		}

		//setup會先於map執行
		@Override
		public void setup(Context context) throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			//得到要快取的檔案的路徑
			Path []cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
			
			//將檔案內容讀到分散式快取
			if (cacheFiles!=null && cacheFiles.length > 0) {
				String line;
				String []tokens;
				for(Path path:cacheFiles)
				{
					if(path.toString().contains("author"))
					{
						BufferedReader br = new BufferedReader(new FileReader(path.toString()));
						try{  
		                    while((line = br.readLine()) != null){  
		                        tokens = line.split(":::", 2);  
		                        authorMap.put(tokens[0], tokens[1]);               
		                    }  
		                }finally{  
		                    br.close();  
		                }  
					}
				}
			}
		}
		
	}
	
	@Override
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		
		Configuration conf = getConf();
		Job job = new Job(conf,"MapJoin");
		job.setJarByClass(MapJoin.class);
		job.setMapperClass(myMapper.class);
		job.setNumReduceTasks(0);
		
		/*新增要加入到快取中的檔案*/
		Path cachefilePath = new Path(args[0]);
		FileSystem hdfs = FileSystem.get(conf);
		FileStatus fileStatus = hdfs.getFileStatus(cachefilePath); 
		//判斷輸入的路徑是檔案還是資料夾
		if(fileStatus.isDir()==false){		//如果輸入的路徑是檔案,新增檔案到快取
			DistributedCache.addCacheFile(cachefilePath.toUri(), job.getConfiguration());
		}
		if(fileStatus.isDir()==true)		//如果輸入的路徑是資料夾,獲取資料夾中的檔案列表
		{
			//獲取資料夾元資料,並一一新增內部所有檔案
			for (FileStatus fs : hdfs.listStatus(cachefilePath)) {
				DistributedCache.addCacheFile(fs.getPath().toUri(), job.getConfiguration());
			}
		}
	
		Path in = new Path(args[1]);
		Path out = new Path(args[2]);
		//設定輸入輸出格式
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		System.exit(job.waitForCompletion(true)? 0 : 1);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		int res = ToolRunner.run(new Configuration(), new MapJoin(), args);
		System.exit(res);
	}

}