MapReduce 之 ---MapJoin
阿新 • • 發佈:2018-12-24
package com.ghgj.cn.mapjoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //這種方法是小檔案載入,匹配大檔案,不需要reduce,所有將reducetask關掉 public class MapJoin { static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{ Text mk=new Text(); Text mv=new Text(); Map<String,String> map=new HashMap<String,String>(); //讀取movies這個表 將表中的所有資料先載入過來 @Override protected void setup(Context context) throws IOException, InterruptedException { //進行movies檔案的讀取 Path[] localCacheFiles = context.getLocalCacheFiles(); //快取檔案路徑 Path path=localCacheFiles[0]; //建立一個字元流 BufferedReader br=new BufferedReader(new FileReader(path.toString())); String line=null; while((line=br.readLine())!=null){ //1::Toy Story (1995)::Animation|Children's|Comedy String[] split = line.split("::"); map.put(split[0], split[1]+"\t"+split[2]); } } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //先讀取ratings檔案 每次讀取一行 和map集合中的資料進行關聯 // 1::1193::5::978300760 ratings.dat String[] split = value.toString().split("::"); String k=split[1]; if(map.containsKey(k)){ //進行關聯 取出map的value 和 現在的資料進行關聯 String res=map.get(k)+"\t"+split[0]+"\t"+split[2]+"\t"+split[3]; mk.set(k); mv.set(res); context.write(mk, mv); } } } public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException { System.setProperty("HADOOP_USER_NAME", "hadoop"); //載入配置檔案 Configuration conf=new Configuration(); //啟動一個job 這裡的mr任務叫做job 這個job作用 封裝mr任務 Job job=Job.getInstance(conf); //指定當前任務的主類 jarclass=Driver.class job.setJarByClass(MapJoin.class); //指定map job.setMapperClass(MyMapper.class); /* * 泛型:jdk1.5 泛型的編譯的時候生效 檢查程式碼的型別是否匹配 執行的時候自動擦除 */ //指定map輸出的key value的型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //將指定的檔案載入到每一個執行計算任務節點的快取中 job.addCacheFile(new URI("hdfs://hadoop01:9000/join_in/movies.dat")); //不需要reducetask設定這裡 job.setNumReduceTasks(0); //修改切片的大小 FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/join_in/ratings.dat")); //指定輸出目錄 輸出路徑不能存在 否則報錯 程式碼執行的餓時候 會幫你建立 FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/mapjoin_out01")); //提交job 引數:是否列印執行日誌 job.waitForCompletion(true); } }