1. 程式人生 > >編寫MapReduce :統計每個關鍵詞,所在檔案及,第幾行出現了多少次

編寫MapReduce :統計每個關鍵詞,所在檔案及,第幾行出現了多少次

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //統計每個關鍵詞在每個文件中,第幾行出現了多少次 public class kaoshi1 { private static int count1 =0; private static int count2 =0; static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{ Text mk=new Text(); Text mv=new
Text(); String filename=""; @Override //setup job任務執行時載入一次,可以獲取檔案資訊 protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //通過檔案的切片資訊,獲取檔名 InputSplit insplit = context.getInputSplit(); FileSplit fs=(FileSplit)insplit; filename = fs.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Map<String, Integer> word = new HashMap<>();//先建立一個map集合用於統計所在行的相同單詞出現個數 //liangchaowei love liujialing String[] sp = value.toString().split(" "); if(filename.startsWith("mapreduce-4-1.txt")){//先判斷訪問的是檔案 count1++; //單詞所在的檔案中的行號 for(String v:sp){ if(word.containsKey(v)){ //判斷是否新增過 word.put(v,word.get(v)+1); //所在行的單詞個數 }else{ word.put(v, 1); } System.out.println(count1+"----------------"); } for(String k:word.keySet()){ mk.set(k); mv.set(filename+":"+count1+","+word.get(k));//將檔名:行號,出現次數封裝當value中 System.out.println(word.get(k)); context.write(mk, mv); } }else{ //與上同理 count2++; for(String v:sp){ if(word.containsKey(v)){ word.put(v,word.get(v)+1); }else{ word.put(v, 1); } } for(String k:word.keySet()){ mk.set(k); mv.set(filename+":"+count2+","+word.get(k)); System.out.println(word.get(k)); context.write(mk, mv); } } } } static class MyReducer extends Reducer<Text, Text, Text, Text>{ Text outValue = new Text(); String[] sp; @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); //建立一個包裝類 //txt.1:1 txt.1:1:2 System.out.println(key+"--------------++++++++++++"); for(Text v:values){ String[] sp = v.toString().split(":"); //測試資料用 sb.append(v.toString()+"\t"); //將values內容進行拼接 System.out.println(sp[0]); //測試是否取到資料 } outValue.set(sb.toString()); context.write(key, outValue); } } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { //本地執行新增對映 System.setProperty("HADOOP_USER_NAME", "hadoop"); //新增配置檔案 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //建立Job任務 job.setJarByClass(kaoshi1.class); //jar執行主類(驅動類driver) job.setMapperClass(MyMapper.class); //指定mapper載入的類 job.setReducerClass(MyReducer.class); //指定reducer載入的類 job.setOutputKeyClass(Text.class); //指定任務的輸出的key型別 job.setOutputValueClass(Text.class); //指定任務的輸出的value型別 FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/ksin02"));//指定載入路徑 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf);//獲取hdfs的檔案系統路徑--物件 Path path = new Path("/ksout05");//輸出結果檔案路徑 if(fs.exists(path)){ //防止目錄存在,啟動失敗 fs.delete(path,true); } FileOutputFormat.setOutputPath(job, path); //指定輸出路徑---(目錄不能存在) job.waitForCompletion(true); //是否列印日誌 } }