mapreduce初級案例
目錄
1.單詞統計
顧名思義,單詞統計主要用於大規模資料中出現字元頻數統計,是一個經典的mapreduce程式。
1.1 例項描述
對輸入檔案出現的單詞進行統計,將結果輸出給一個新檔案。
樣本輸入:
1)java hadoop
hbase hadoop java
hive hotspot
2) hadoop hotspot
hive java spark
樣本輸出:
hadoop3 hbase1 hive2 hotpot2 java3 spark1
1.2 設計思路
map階段輸入為一個LongWritable(起始偏移量),Text(實際單詞),reduce階段同一組key呼叫reducer將reduce輸入的value放入迭代器遍歷並累加,將其結果作為reduce輸出的value,從而得到單詞統計。
1.3 程式程式碼
package cn.itcast.hadoop; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class wordcountmapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); Text text = new Text(); IntWritable intWritable=new IntWritable(); for (String word : words) { text.set(word); intWritable.set(1); context.write(text, intWritable); } } public static void main(String[] args) { // TODO Auto-generated method stub } }
package cn.itcast.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class wordcountreduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
package cn.itcast.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class wordcountdrive {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(wordcountdrive.class);
job.setMapperClass(wordcountmapper.class);
job.setReducerClass(wordcountreduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b=job.waitForCompletion(true);
System.exit(b?0:1);
}
}
2.資料去重
"資料去重"主要是為了掌握和利用並行化思想來對資料進行有意義的篩選。統計大資料集上的資料種類個數、從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及資料去重。下面就進入這個例項的MapReduce程式設計。
2.1 例項描述
1)
66 55 23 23 55
2)
12 23 12 66
2.2 設計思路
資料去重的最終目標是讓原始資料中出現次數超過一次的資料在輸出檔案中只出現一次。我們自然而然會想到將同一個資料的所有記錄都交給一臺reduce機器,無論這個資料出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以資料作為key,而對value-list則沒有要求。當reduce接收到一個<key,value-list>時就直接將key複製到輸出的key中,並將value設定成空值。
在MapReduce流程中,map的輸出<key,value>經過shuffle過程聚整合<key,value-list>後會交給reduce。所以從設計好的reduce輸入可以反推出map的輸出key應為資料,value任意。繼續反推,map輸出資料的key為資料,而在這個例項中每個資料代表輸入檔案中的一行內容,所以map階段要完成的任務就是在採用Hadoop預設的作業輸入方式之後,將value設定為key,並直接輸出(輸出中的value任意)。map中的結果經過shuffle過程之後交給reduce。reduce階段不會管每個key有多少個value,它直接將輸入的key複製為輸出的key,並輸出就可以了(輸出中的value被設定成空了)。
2.3 程式程式碼
package cn.cast.hadoop;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class RemoveRepeatmapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = value;
context.write(text, new Text(""));
}
public static void main(String[] args) {
}
}
package cn.cast.hadoop;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class RemoveRepeatreducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key, new Text(""));
}
public static void main(String[] args) {
}
}
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RemoveRepeatdrive {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(RemoveRepeatdrive.class);
job.setMapperClass(RemoveRepeatmapper.class);
job.setReducerClass(RemoveRepeatreducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b=job.waitForCompletion(true);
System.exit(b?0:1);
}
}
3.班級學科平均分數
"平均成績"主要目的還是在重溫經典"WordCount"例子,可以說是在基礎上的微變化版,該例項主要就是實現一個計算學生平均成績的例子。
3.1 例項描述
樣本輸入
1)
張三:
math 88
english 86
history 78
2)
李四:
math 98
english 66
history 82
3.2 設計思路
將學科名字作為map輸入key,學科分數作為map輸入value,輸出為Test和InWritable,傳入reduce時,將分數傳入迭代器,遍歷迭代器時將所有值相加除以2,得到平均分。
3.3 程式程式碼
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Averagemapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String results[] = line.split(" ");
IntWritable intWritable =new IntWritable();
intWritable.set(Integer.parseInt(results[1]));
String result =results[0];
context.write(new Text(result),intWritable);
}
public static void main(String[] args) {
}
}
package cn.itcast.hadoop;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Averagereducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable intWritable : values) {
count+=intWritable.get();
}
context.write(key,new IntWritable(count/2));
}
public static void main(String[] args) {
}
}
package cn.itcast.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Averagedriver {
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(Averagedriver.class);
job.setMapperClass(Averagemapper.class);
job.setReducerClass(Averagereducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job,args[0]);
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
參考:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html
持續更新~