1. 程式人生 > 其它 >09.Mapreduce例項——ChainMapReduce小

09.Mapreduce例項——ChainMapReduce小

09Mapreduce例項——ChainMapReduce

實驗原理

一些複雜的任務難以用一次MapReduce處理完成,需要多次MapReduce才能完成任務。Hadoop2.0開始MapReduce作業支援鏈式處理,類似於工廠的的生產線,每一個階段都有特定的任務要處理,比如提供原配件——>組裝——打印出廠日期,等等。通過這樣進一步的分工,從而提高了生成效率,我們Hadoop中的鏈式MapReduce也是如此,這些Mapper可以像水流一樣,一級一級向後處理,有點類似於Linux的管道。前一個Mapper的輸出結果直接可以作為下一個Mapper的輸入,形成一個流水線。

鏈式MapReduce的執行規則:整個Job中只能有一個Reducer,在Reducer前面可以有一個或者多個Mapper,在Reducer的後面可以有0個或者多個Mapper。

Hadoop2.0支援的鏈式處理MapReduce作業有一下三種:

(1)順序連結MapReduce作業

類似於Unix中的管道:mapreduce-1 | mapreduce-2 | mapreduce-3 ......,每一個階段建立一個job,並將當前輸入路徑設為前一個的輸出。在最後階段刪除鏈上生成的中間資料。

(2)具有複雜依賴的MapReduce連結

若mapreduce-1處理一個數據集, mapreduce-2 處理另一個數據集,而mapreduce-3對前兩個做內部連結。這種情況通過Job和JobControl類管理非線性作業間的依賴。如x.addDependingJob(y)意味著x在y完成前不會啟動。

(3)預處理和後處理的連結

一般將預處理和後處理寫為Mapper任務。可以自己進行連結或使用ChainMapper和ChainReducer類,生成得作業表示式類似於:

MAP+ | REDUCE | MAP*

如以下作業: Map1 | Map2 | Reduce | Map3 | Map4,把Map2和Reduce視為MapReduce作業核心。Map1作為前處理,Map3, Map4作為後處理。ChainMapper使用模式:(預處理作業),ChainReducer使用模式:(設定Reducer並新增後處理Mapper)

本實驗中用到的就是第三種作業模式:預處理和後處理的連結,生成得作業表示式類似於 Map1 | Map2 | Reduce | Map3

1.建表,逗號分隔

2.本地新建/data/mapreduce10目錄。

mkdir-p/data/mapreduce10

3.將表上傳到虛擬機器中

4.上傳並解壓hadoop2lib檔案

5.在HDFS上新建/mymapreduce10/in目錄,然後將Linux本地/data/mapreduce10目錄下的goods_0檔案匯入到HDFS的/mymapreduce10/in目錄中。

hadoopfs-mkdir-p/mymapreduce10/in

hadoopfs-put/data/mapreduce10/goods_0/mymapreduce10/in

6.IDEA中編寫Java程式碼

package mapreduce9;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DoubleWritable;
public class ChainMapReduce {
private static final String INPUTPATH = "hdfs://192.168.149.10:9000/mymapreduce10/in/goods_0";
private static final String OUTPUTPATH = "hdfs://192.168.149.10:9000/mymapreduce10/out";
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(OUTPUTPATH), conf);
if (fileSystem.exists(new Path(OUTPUTPATH))) {
fileSystem.delete(new Path(OUTPUTPATH), true);
}
Job job = new Job(conf, ChainMapReduce.class.getSimpleName());
FileInputFormat.addInputPath(job, new Path(INPUTPATH));
job.setInputFormatClass(TextInputFormat.class);
ChainMapper.addMapper(job, FilterMapper1.class, LongWritable.class, Text.class, Text.class, DoubleWritable.class, conf);
ChainMapper.addMapper(job, FilterMapper2.class, Text.class, DoubleWritable.class, Text.class, DoubleWritable.class, conf);
ChainReducer.setReducer(job, SumReducer.class, Text.class, DoubleWritable.class, Text.class, DoubleWritable.class, conf);
ChainReducer.addMapper(job, FilterMapper3.class, Text.class, DoubleWritable.class, Text.class, DoubleWritable.class, conf);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUTPATH));
job.setOutputFormatClass(TextOutputFormat.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class FilterMapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable> {
private Text outKey = new Text();
private DoubleWritable outValue = new DoubleWritable();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException,InterruptedException {
String line = value.toString();
if (line.length() > 0) {
String[] splits = line.split(",");
double visit = Double.parseDouble(splits[1].trim());
if (visit <= 600) {
outKey.set(splits[0]);
outValue.set(visit);
context.write(outKey, outValue);
}
}
}
}
public static class FilterMapper2 extends Mapper<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void map(Text key, DoubleWritable value, Mapper<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException,InterruptedException {
if (value.get() < 100) {
context.write(key, value);
}
}
}
public static class SumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable outValue = new DoubleWritable();
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
double sum = 0;
for (DoubleWritable val : values) {
sum += val.get();
}
outValue.set(sum);
context.write(key, outValue);
}
}
public static class FilterMapper3 extends Mapper<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void map(Text key, DoubleWritable value, Mapper<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
if (key.toString().length() < 3) {
System.out.println("寫出去的內容為:" + key.toString() +"++++"+ value.toString());
context.write(key, value);
}
}

}

}

7.將hadoop2lib目錄中的jar包,拷貝到hadoop2lib目錄下。

8.拷貝log4j.properties檔案

9.執行結果

hadoopfs-ls/mymapreduce10/out

hadoopfs-cat/mymapreduce10/out/part-r-00000