1. 程式人生 > >MapReduce 之 ---MapJoin

MapReduce 之 ---MapJoin

package com.ghgj.cn.mapjoin;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//這種方法是小檔案載入,匹配大檔案,不需要reduce,所有將reducetask關掉

public class MapJoin {
	static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
		Text mk=new Text();
		Text mv=new Text();
		Map<String,String> map=new HashMap<String,String>();
		//讀取movies這個表  將表中的所有資料先載入過來
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			//進行movies檔案的讀取
			Path[] localCacheFiles = context.getLocalCacheFiles();
			//快取檔案路徑
			Path path=localCacheFiles[0];
			//建立一個字元流
			BufferedReader br=new BufferedReader(new FileReader(path.toString()));
			String line=null;
			while((line=br.readLine())!=null){
				//1::Toy Story (1995)::Animation|Children's|Comedy
				String[] split = line.split("::");
				map.put(split[0], split[1]+"\t"+split[2]);
			}
			
		}
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			//先讀取ratings檔案  每次讀取一行   和map集合中的資料進行關聯
			// 1::1193::5::978300760   ratings.dat
			String[] split = value.toString().split("::");
			String k=split[1];
			if(map.containsKey(k)){
				//進行關聯  取出map的value  和   現在的資料進行關聯
				String res=map.get(k)+"\t"+split[0]+"\t"+split[2]+"\t"+split[3];
				mk.set(k);
				mv.set(res);
				context.write(mk, mv);
				
			}
		}
	}
		public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
			System.setProperty("HADOOP_USER_NAME", "hadoop");
			//載入配置檔案
			Configuration conf=new Configuration();
			//啟動一個job  這裡的mr任務叫做job   這個job作用  封裝mr任務
			Job job=Job.getInstance(conf);
			
			//指定當前任務的主類   jarclass=Driver.class
			job.setJarByClass(MapJoin.class);
			
			//指定map  
			job.setMapperClass(MyMapper.class);
			
			/*
			 * 泛型:jdk1.5  泛型的編譯的時候生效  檢查程式碼的型別是否匹配  執行的時候自動擦除
			 */
			//指定map輸出的key  value的型別
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			
			//將指定的檔案載入到每一個執行計算任務節點的快取中
			job.addCacheFile(new URI("hdfs://hadoop01:9000/join_in/movies.dat"));
			//不需要reducetask設定這裡
			job.setNumReduceTasks(0);
			//修改切片的大小
			FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/join_in/ratings.dat"));
			//指定輸出目錄  輸出路徑不能存在  否則報錯  程式碼執行的餓時候  會幫你建立
			
			FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/mapjoin_out01"));
			
			//提交job  引數:是否列印執行日誌
			job.waitForCompletion(true);
		}
}