1. 程式人生 > 其它 >01.Mapreduce例項——去重

01.Mapreduce例項——去重

實驗原理

“資料去重”主要是為了掌握和利用並行化思想來對資料進行有意義的篩選。統計大資料集上的資料種類個數、從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及資料去重。

資料去重的最終目標是讓原始資料中出現次數超過一次的資料在輸出檔案中只出現一次。在MapReduce流程中,map的輸出<key,value>經過shuffle過程聚整合<key,value-list>後交給reduce。我們自然而然會想到將同一個資料的所有記錄都交給一臺reduce機器,無論這個資料出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以資料作為key,而對value-list則沒有要求(可以設定為空)。當reduce接收到一個<key,value-list>時就直接將輸入的key複製到輸出的key中,並將value設定成空值,然後輸出<key,value>。

1.在Linux中開啟Hadoop

start-all.sh

2.在Linux本地新建/data/mapreduce2目錄。

mkdir-p/data/mapreduce2

3.從網上下載hadoop2lib,解壓到mapreduce資料夾下

4.在HDFS上新建/mymapreduce2/in目錄,然後將Linux本地/data/mapreduce2目錄下的buyer_favorite1檔案匯入到HDFS的/mymapreduce2/in目錄中。

hadoop fs -mkdir -p /mymapreduce2/in

hadoop fs -put /data/mapreduce2/buyer_favorite1 /mymapreduce2/in

5.在IDEA中編寫程式碼

package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.log4j.BasicConfigurator;

public class Filter{
public static class Map extends Mapper<Object , Text , Text , NullWritable>{
private static Text newKey

=new Text();
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
String line=value.toString();
System.out.println(line);
String arr[]=line.split(",");
newKey.set(arr[1]);
context.write(newKey, NullWritable.get());
System.out.println(newKey);
}
}
public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{
public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException{
context.write(key,NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
System.setProperty("HADOOP_USER_NAME","root");
BasicConfigurator.configure(); //自動快速地使用預設Log4j環境
Configuration conf=new Configuration();
System.out.println("start");
Job job =new Job(conf,"filter");
job.setJarByClass(Filter.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

Path in=new Path("hdfs://192.168.149.10:9000/mymapreduce2/in/buyer_favorite1");
Path out=new Path("hdfs://192.168.149.10:9000/mymapreduce2/in/out");
FileInputFormat.addInputPath(job,in);
FileOutputFormat.setOutputPath(job,out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

6.建立resources資料夾,其中建立log4j.properties檔案

hadoop.root.logger=DEBUG, console
log4j.rootLogger = DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

7.匯入hadoop2lib的包

8.執行結果

注意:如果有報錯,配置一下這個 等號後跟你虛擬機器中的使用者名稱