Java MapReduce 基本計算操作實現實戰
package com.sct.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
/**
* Created by leitao on 2017/3/13.
* 排序
*/
public class Sort {
//map將輸入中的value化成IntWritable型別,作為輸出的key
public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{
private static IntWritable data = new IntWritable();
//實現map函式
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
data.set(Integer.parseInt(line));
context.write(data,new IntWritable(1));
}
}
//reduce 將輸入的key複製輸出資料的key上
//然後根據輸入的value-list中的元素的個數決定key的輸出次數
//用全域性linenum來代表key的位次
public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
private static IntWritable linenum = new IntWritable(1);
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable val : values){
context.write(linenum,key);
linenum = new IntWritable(linenum.get()+1);
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//這句話很關鍵
conf.set("mapred.job.tracker","192.168.113.130");
String[] ioArgs = new String[]{"sort_in","sort_in_1","sort_out"};
String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
if (otherArgs.length!=3){
System.err.println("Useage:Data Sort <in> <out>");
System.exit(3);
}
Job job = new Job(conf,"Data Sort");
job.setJarByClass(Sort.class);
//設定Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//設定輸出型別
job.setOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
//設定輸入輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//設定多個輸入目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[2]));
boolean flag = job.waitForCompletion(true);
if (flag){
System.out.println("排序成功!");
}else {
System.out.println("排序失敗!");
}
System.exit(1);
}
}
3、平均成績
"平均成績"主要目的還是在重溫經典"WordCount"例子,可以說是在基礎上的微變化版,該例項主要就是實現一個計算學生平均成績的例子。3.1 例項描述
對輸入檔案中資料進行就算學生平均成績。輸入檔案中的每行內容均為一個學生的姓名和他相應的成績,如果有多門學科,則每門學科為一個檔案。要求在輸出中每行有兩個間隔的資料,其中,第一個代表學生的姓名,第二個代表其平均成績。
樣本輸入:
1)math:
張三 88
李四 99
王五 66
趙六 77
2)china:
張三 78
李四 89
王五 96
趙六 67
3)english:
張三 80
李四 82
王五 84
趙六 86
樣本輸出:
張三 82
李四 90
王五 82
趙六 76
3.2 設計思路
計算學生平均成績是一個仿"WordCount"例子,用來重溫一下開發MapReduce程式的流程。程式包括兩部分的內容:Map部分和Reduce部分,分別實現了map和reduce的功能。
Map處理的是一個純文字檔案,檔案中存放的資料時每一行表示一個學生的姓名和他相應一科成績。Mapper處理的資料是由InputFormat分解過的資料集,其中InputFormat的作用是將資料集切割成小資料集InputSplit,每一個InputSlit將由一個Mapper負責處理。此外,InputFormat中還提供了一個RecordReader的實現,並將一個InputSplit解析成<key,value>對提供給了map函式。InputFormat的預設值是TextInputFormat,它針對文字檔案,按行將文字切割成InputSlit,並用LineRecordReader將InputSplit解析成<key,value>對,key是行在文字中的位置,value是檔案中的一行。
Map的結果會通過partion分發到Reducer,Reducer做完Reduce操作後,將通過以格式OutputFormat輸出。
Mapper最終處理的結果對<key,value>,會送到Reducer中進行合併,合併的時候,有相同key的鍵/值對則送到同一個Reducer上。Reducer是所有使用者定製Reducer類地基礎,它的輸入是key和這個key對應的所有value的一個迭代器,同時還有Reducer的上下文。Reduce的結果由Reducer.Context的write方法輸出到檔案中。
3.3 程式程式碼
程式程式碼如下所示:package com.sct.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
/**
* Created by leitao on 2017/3/13.
*計算品均成績
*/
public class Score {
public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將輸入的純文字檔案的資料化成String
String line = value.toString();
//將輸入的資料首先按行進行切割
StringTokenizer tokenizer = new StringTokenizer(line,"\n");
//分別對每一行進行處理
while (tokenizer.hasMoreElements()){
//每行按空格劃分
StringTokenizer stringTokenizer = new StringTokenizer(tokenizer.nextToken());
String strName = stringTokenizer.nextToken();//學生姓名部分
String strScore = stringTokenizer.nextToken();//成績部分
Text name = new Text(strName);
int scoreInt = Integer.parseInt(strScore);
//輸出姓名和成績
context.write(name,new IntWritable(scoreInt));
}
}
}
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum =0;
int count =0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()){
sum +=iterator.next().get();//計算總分
count++;
}
int average = (int)sum/count;//計算平均分
context.write(key,new IntWritable(average));
}
}
public static void main(String[] arge) throws Exception{
Configuration conf = new Configuration();
conf.set("mapred.job.tracker","192.168.113.130");
String[] ioArgs = new String[]{"score_in","score_out"};
String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
if (otherArgs.length!=2){
System.err.println("Usage:Score Avarage <in> <out>");
System.exit(2);
}
Job job = new Job(conf,"Score Average");
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//將輸入資料集分割成小資料塊splites,提供一個RecordReder的實現
job.setInputFormatClass(TextInputFormat.class);
//提供一個RecordWriter的實現,負責資料輸出
job.setOutputFormatClass(TextOutputFormat.class);
//設定輸入輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
boolean flag = job.waitForCompletion(true);
if (flag){
System.out.println("平均成績計算成功!");
}else {
System.out.println("平均成績計算失敗!");
}
System.exit(1);
}
}
4、單表關聯
前面的例項都是在資料上進行一些簡單的處理,為進一步的操作打基礎。"單表關聯"這個例項要求從給出的資料中尋找所關心的資料,它是對原始資料所包含資訊的挖掘。下面進入這個例項。
4.1 例項描述
例項中給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。
樣例輸入如下所示。
file:
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
家族樹狀關係譜:
樣例輸出如下所示。
file:
grandchild grandparent
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
4.2 設計思路
分析這個例項,顯然需要進行單表連線,連線的是左表的parent列和右表的child列,且左表和右表是同一個表。
連線結果中除去連線的兩列就是所需要的結果——"grandchild--grandparent"表。要用MapReduce解決這個例項,首先應該考慮如何實現表的自連線;其次就是連線列的設定;最後是結果的整理。
考慮到MapReduce的shuffle過程會將相同的key會連線在一起,所以可以將map結果的key設定成待連線的列,然後列中相同的值就自然會連線在一起了。再與最開始的分析聯絡起來:
要連線的是左表的parent列和右表的child列,且左表和右表是同一個表,所以在map階段將讀入資料分割成child和parent之後,會將parent設定成key,child設定成value進行輸出,並作為左表;再將同一對child和parent中的child設定成key,parent設定成value進行輸出,作為右表。為了區分輸出中的左右表,需要在輸出的value中再加上左右表的資訊,比如在value的String最開始處加上字元1表示左表,加上字元2表示右表。這樣在map的結果中就形成了左表和右表,然後在shuffle過程中完成連線。reduce接收到連線的結果,其中每個key的value-list就包含了"grandchild--grandparent"關係。取出每個key的value-list進行解析,將左表中的child放入一個陣列,右表中的parent放入一個陣列,然後對兩個陣列求笛卡爾積就是最後的結果了。
4.3 程式程式碼
程式程式碼如下所示。
package com.sct.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
/**
* Created by leitao on 2017/3/13.
* 單表關聯
*/
public class STjoin {
public static int time =0;
/*
map將輸出分割child和parent,然後正序輸出一次作為右表,
反序輸出一次作為左表,需要注意的是在輸出的value中必須
加上左右表的區別標識
*/
public static class Map extends Mapper<Object,Text,Text,Text>{
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String childname = new String();//孩子名稱
String parentname = new String();//父母名稱
String relationtype = new String();//左右表標識
//輸入的一行預處理文字
StringTokenizer itr = new StringTokenizer(value.toString());
String[] values = new String[2];
int i = 0;
while (itr.hasMoreTokens()){
values[i] = itr.nextToken();
i++;
}
if (values[0].compareTo("child")!=0){
childname = values[0];
parentname = values[1];
//輸出左表
relationtype = "1";
context.write(new Text(values[1]),new Text(relationtype+"+"+childname+"+"+parentname));
//輸出右表
relationtype="2";
context.write(new Text(values[0]),new Text(relationtype+"+"+childname+"+"+parentname));
}
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>{
//實現reduce函式
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//輸出表頭
if (0==time){
context.write(new Text("grandchild"),new Text("grandparent"));
time++;
}
int grandchildnum =0;
String[] grandchild = new String[10];
int grandparentnum =0;
String[] grandparent = new String[10];
Iterator ite = values.iterator();
while (ite.hasNext()){
String record = ite.next().toString();
int len = record.length();
int i = 2;
if (len==0){
continue;
}
//取得左右表標識
char relationtype = record.charAt(0);
//定義孩子和父母變數
String childname = new String();
String parentname = new String();
//獲取value-list中value的child
while (record.charAt(i)!='+'){
childname+=record.charAt(i);
i++;
}
i=i+1;
//獲取value-list中value的parent
while (i<len){
parentname+=record.charAt(i);
i++;
}
//左表,取出child放入grandchildren
if ('1'==relationtype){
grandchild[grandchildnum] = childname;
grandchildnum++;
}
//右表,取出parent放入grandparent
if ('2'==relationtype){
grandparent[grandparentnum]=parentname;
grandparentnum++;
}
}
//grandchild和grandparent陣列求笛卡爾積
if (0!=grandchildnum&&0!=grandparentnum){
for (int m =0;m<grandchildnum;m++){
for(int n=0;n<grandparentnum;n++){
//輸出結果
context.write(new Text(grandchild[m]),new Text(grandparent[n]));
}
}
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//這句話很關鍵
conf.set("mapred.job.tracker","192.168.113.130");
String[] ioArgs = new String[]{"STjoin_in","STjoin_out"};
String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
if (otherArgs.length!=2){
System.err.println("Useage:Data STjoin <in> <out>");
System.exit(2);
}
Job job = new Job(conf,"Data STjoin");
job.setJarByClass(Sort.class);
//設定Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//設定輸出型別
job.setOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
//設定輸入輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean flag = job.waitForCompletion(true);
if (flag){
System.out.println("單表關聯操作成功!");
}else {
System.out.println("單表關聯操作失敗!");
}
System.exit(1);
}
}