大資料(hadoop-mapreduce案例講解)
阿新 • • 發佈:2019-05-30
package com.vip; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapReduceCaseAvg extends Configured implements Tool{ public static class AvgMapper extends Mapper<Object, Text, Text, IntWritable>{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //獲取檔案內容 String content = value.toString() ; //字串切分 StringTokenizer st = new StringTokenizer(content) ; while(st.hasMoreElements()){ String strName = st.nextToken() ; //學員姓名 String strSorce = st.nextToken() ; //學員成績 //輸出key,value context.write(new Text(strName), new IntWritable(Integer.parseInt(strSorce))); } } } public static class AvgReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ //<張三 ,{98,89,79}> @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //平均值,即使將所有的成績相加除以科目數 int sum = 0 ; //總成績 int num = 0 ; //總科目 for (IntWritable score : values) { sum += score.get() ; //累加每門課得成績 num ++ ; } context.write(key, new IntWritable((int)sum/num)); } } @Override public int run(String[] args) throws Exception { //任務和引數 Job job = Job.getInstance(getConf(), "avg mr") ; job.setJarByClass(MapReduceCaseAvg.class); /*設定map方法的類*/ job.setMapperClass(AvgMapper.class); job.setReducerClass(AvgReducer.class); /*設定輸出的key和value的型別*/ job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /*設定輸入輸出引數*/ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); /*提交作業到叢集並等待任務完成*/ boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1 ; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new MapReduceCaseAvg(), args) ; System.exit(res); } }
package com.vip; import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapReduceCaseFilte extends Configured implements Tool { public static class FilterMapper extends Mapper<Object, Text, NullWritable, Text>{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //以空格切分欄位 String[] strSrc = value.toString().split(" "); //拼接字串 String strDst = strSrc[0] + " " + strSrc[1] + " " + strSrc[2] + " " + strSrc[6] ; context.write(NullWritable.get(), new Text(strDst)); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "mrfilter") ; job.setJarByClass(MapReduceCaseFilte.class); /*設定map方法的類*/ job.setMapperClass(FilterMapper.class); /*設定輸出的key和value的型別*/ job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); /*設定輸入輸出引數*/ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); /*提交作業到叢集並等待任務完成*/ boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1 ; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new MapReduceCaseFilte(), args) ; System.exit(res); } } // cat act // tar art //<act,{cat,tac,cta}>
package com.vip; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapReduceCaseWords extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration() ; //刪除已經存在的輸出目錄 Path mypath = new Path(args[1]) ; FileSystem hdfs = mypath.getFileSystem(conf); if(hdfs.isDirectory(mypath)){ hdfs.delete(mypath, true) ; } //設定任務資訊 Job job = Job.getInstance(conf, "words mr") ; job.setJarByClass(MapReduceCaseWords.class); /*設定map方法的類*/ job.setMapperClass(WordsMapper.class); job.setReducerClass(WordsReducer.class); /*設定輸出的key和value的型別*/ job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); /*設定輸入輸出引數*/ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); /*提交作業到叢集並等待任務完成*/ boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1 ; } public static void main(String[] args) throws Exception { String[] args0 = {"hdfs://192.168.153.111:9000/input5", "hdfs://192.168.153.111:9000/output12"} ; int res = ToolRunner.run(new MapReduceCaseWords(), args0) ; System.exit(res); } }
package com.vip;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordsMapper extends Mapper<Object, Text, Text, Text>{
private Text keyText = new Text() ;
private Text valueText = new Text() ;
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String word = value.toString() ;
char[] wordChars = word.toCharArray(); //單詞轉化為字元陣列
Arrays.sort(wordChars); //對字元陣列進行排序
String sword = new String(wordChars) ; //字元陣列在轉化為字串
keyText.set(sword); //設定輸出key
valueText.set(word); //設定輸出得value得值
context.write(keyText, valueText); //map輸出
}
}
package com.vip;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordsReducer extends Reducer<Text, Text, Text, Text>{
private Text outputKey = new Text() ; //輸出key
private Text outputValue = new Text() ; //輸出的value
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String output ="" ;
//對相同字母組成的單詞,使用~符號進行拼接
for (Text word : values) {
if(!output.equals("")){
output = output + "~" ;
}
output = output + word.toString() ;
}
//輸出有兩個單詞或以上的結果
StringTokenizer outputTokenize = new StringTokenizer(output, "~") ;
if(outputTokenize.countTokens() >= 2){
output = output.replaceAll("~", ",") ;
outputKey.set(key.toString()); //設定key的值
outputValue.set(output); //設定value的值
context.write(outputKey, outputValue); //輸出
}
}
}