MapReduce的Map side join
阿新 • • 發佈:2019-02-09
package com.inspur.mapreduce.join; /************************************* * @author: caolch * @date: 2013-12-31 * @note: 利用mapper寫的表連線,小表讀到記憶體裡 *************************************/ import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapJoin extends Configured implements Tool { public static class myMapper extends Mapper<Object, Text, Text, Text> { // TODO Auto-generated constructor stub private HashMap<String,String> authorMap = new HashMap<String,String>(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String []tokens = value.toString().split(":::"); String joinData = authorMap.get(tokens[1]); if (joinData!=null) { context.write(new Text(tokens[0]),new Text(joinData)); } } //setup會先於map執行 @Override public void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //得到要快取的檔案的路徑 Path []cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); //將檔案內容讀到分散式快取 if (cacheFiles!=null && cacheFiles.length > 0) { String line; String []tokens; for(Path path:cacheFiles) { if(path.toString().contains("author")) { BufferedReader br = new BufferedReader(new FileReader(path.toString())); try{ while((line = br.readLine()) != null){ tokens = line.split(":::", 2); authorMap.put(tokens[0], tokens[1]); } }finally{ br.close(); } } } } } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = getConf(); Job job = new Job(conf,"MapJoin"); job.setJarByClass(MapJoin.class); job.setMapperClass(myMapper.class); job.setNumReduceTasks(0); /*新增要加入到快取中的檔案*/ Path cachefilePath = new Path(args[0]); FileSystem hdfs = FileSystem.get(conf); FileStatus fileStatus = hdfs.getFileStatus(cachefilePath); //判斷輸入的路徑是檔案還是資料夾 if(fileStatus.isDir()==false){ //如果輸入的路徑是檔案,新增檔案到快取 DistributedCache.addCacheFile(cachefilePath.toUri(), job.getConfiguration()); } if(fileStatus.isDir()==true) //如果輸入的路徑是資料夾,獲取資料夾中的檔案列表 { //獲取資料夾元資料,並一一新增內部所有檔案 for (FileStatus fs : hdfs.listStatus(cachefilePath)) { DistributedCache.addCacheFile(fs.getPath().toUri(), job.getConfiguration()); } } Path in = new Path(args[1]); Path out = new Path(args[2]); //設定輸入輸出格式 FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); System.exit(job.waitForCompletion(true)? 0 : 1); return 0; } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int res = ToolRunner.run(new Configuration(), new MapJoin(), args); System.exit(res); } }