編寫MapReduce :統計每個關鍵詞,所在檔案及,第幾行出現了多少次
阿新 • • 發佈:2018-12-24
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//統計每個關鍵詞在每個文件中,第幾行出現了多少次
public class kaoshi1 {
private static int count1 =0;
private static int count2 =0;
static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
Text mk=new Text();
Text mv=new Text();
String filename="";
@Override
//setup job任務執行時載入一次,可以獲取檔案資訊
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//通過檔案的切片資訊,獲取檔名
InputSplit insplit = context.getInputSplit();
FileSplit fs=(FileSplit)insplit;
filename = fs.getPath().getName();
}
@Override
protected void map(LongWritable key,
Text value,
Context context)
throws IOException, InterruptedException {
Map<String, Integer> word = new HashMap<>();//先建立一個map集合用於統計所在行的相同單詞出現個數
//liangchaowei love liujialing
String[] sp = value.toString().split(" ");
if(filename.startsWith("mapreduce-4-1.txt")){//先判斷訪問的是檔案
count1++; //單詞所在的檔案中的行號
for(String v:sp){
if(word.containsKey(v)){ //判斷是否新增過
word.put(v,word.get(v)+1); //所在行的單詞個數
}else{
word.put(v, 1);
}
System.out.println(count1+"----------------");
}
for(String k:word.keySet()){
mk.set(k);
mv.set(filename+":"+count1+","+word.get(k));//將檔名:行號,出現次數封裝當value中
System.out.println(word.get(k));
context.write(mk, mv);
}
}else{ //與上同理
count2++;
for(String v:sp){
if(word.containsKey(v)){
word.put(v,word.get(v)+1);
}else{
word.put(v, 1);
}
}
for(String k:word.keySet()){
mk.set(k);
mv.set(filename+":"+count2+","+word.get(k));
System.out.println(word.get(k));
context.write(mk, mv);
}
}
}
}
static class MyReducer extends Reducer<Text, Text, Text, Text>{
Text outValue = new Text();
String[] sp;
@Override
protected void reduce(Text key,
Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer(); //建立一個包裝類
//txt.1:1 txt.1:1:2
System.out.println(key+"--------------++++++++++++");
for(Text v:values){
String[] sp = v.toString().split(":"); //測試資料用
sb.append(v.toString()+"\t"); //將values內容進行拼接
System.out.println(sp[0]); //測試是否取到資料
}
outValue.set(sb.toString());
context.write(key, outValue);
}
}
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
//本地執行新增對映
System.setProperty("HADOOP_USER_NAME", "hadoop");
//新增配置檔案
Configuration conf = new Configuration();
Job job = Job.getInstance(conf); //建立Job任務
job.setJarByClass(kaoshi1.class); //jar執行主類(驅動類driver)
job.setMapperClass(MyMapper.class); //指定mapper載入的類
job.setReducerClass(MyReducer.class); //指定reducer載入的類
job.setOutputKeyClass(Text.class); //指定任務的輸出的key型別
job.setOutputValueClass(Text.class); //指定任務的輸出的value型別
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/ksin02"));//指定載入路徑
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf);//獲取hdfs的檔案系統路徑--物件
Path path = new Path("/ksout05");//輸出結果檔案路徑
if(fs.exists(path)){ //防止目錄存在,啟動失敗
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path); //指定輸出路徑---(目錄不能存在)
job.waitForCompletion(true); //是否列印日誌
}
}