統計單詞在每個檔案中出現的次數,並且將出現次數按照降序排列
阿新 • • 發佈:2018-12-24
package kaoshi3;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//統計單詞在每個檔案中出現的次數,並且將出現次數按照降序排列
public class wordcount {
static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
Text mk=new Text();
Text mv=new Text();
String filename="";
@Override
//setup job任務執行時載入一次,可以獲取檔案資訊
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
InputSplit insplit = context.getInputSplit(); //通過上下文物件,獲取切片檔案的切片資訊
FileSplit fs=(FileSplit)insplit; //轉換其型別,inputsplit中沒有獲取檔名的
filename = fs.getPath().getName(); //獲取檔名
}
@Override
protected void map(LongWritable key,
Text value,
Context context)
throws IOException, InterruptedException {
//liangchaowei love liujialing
String[] sp = value.toString().split(" ");
for(String word: sp){
mk.set(word);
mv.set(filename);
context.write(mk, mv); //單詞作為key,檔名作為value傳送
}
}
}
static class MyReducer extends Reducer<Text, Text, Text, Text>{
Text t = new Text();
String sv="";
int sum1=0;
int sum2=0;
String file1="";
String file2="";
String out1="";
String out2="";
@Override
protected void reduce(Text key,
Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
for(Text v:values){
sv = v.toString();
if(sv.startsWith("mapreduce-4-1.txt")){
file1=sv; //檔名
sum1++; //獲取次數
}else{
file2=sv; //獲取次數
sum2++; //檔名
}
}
if(sum1>sum2){ //判斷排序
out1=file1+":"+sum1+"\t"+file2+":"+sum2;//輸出資料比較少就用了String型別
t.set(out1);
context.write(key,t);
}else{
out2=file2+":"+sum2+"\t"+file1+":"+sum1;
t.set(out2);
context.write(key,t);
}
}
}
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
//本地執行新增
System.setProperty("HADOOP_USER_NAME", "hadoop");
//新增配置檔案
Configuration conf = new Configuration();
Job job = Job.getInstance(conf); //建立一個job任務
job.setJarByClass(kaoshi3.wordcount.class); //指定驅動類的載入路徑
job.setMapperClass(MyMapper.class); //指定mapper的載入類
job.setReducerClass(MyReducer.class); //指定reduce載入類
job.setOutputKeyClass(Text.class); //設定輸出key的型別
job.setOutputValueClass(Text.class); //設定輸出valse的型別
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/ksin02")); //新增輸入路徑
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf);
Path path = new Path("/ksout02"); //輸出路徑
if(fs.exists(path)){ //判斷輸出路徑是否存在
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path);
job.waitForCompletion(true);
}
}