maprecue將兩類ip分類去重並且輸出到不同目錄檔案中
阿新 • • 發佈:2018-11-19
有一份含有兩類ip的資料,根據一個欄位標記來區分,現在需要將去重,兩類Ip分類儲存到不同檔案中,第三類資料捨棄。
主要知識點##:
1、 自定義分割槽:繼承Partitoner類,重寫getPartitoin()方法;
2、多路徑輸出:MultipleOutputs類的用法;
mapreduce程式如下:
maven依賴pom檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.2144</groupId>
<artifactId>dataclean</artifactId>
<version> 1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<!--<scope>provided</scope>-->
</dependency >
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.5</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.5</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.6.5</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd-core</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20171018</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.56</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
<build>
<!--指定jar包名字-->
<finalName>hm_distinct_ip</finalName>
<plugins>
<!--打包外掛-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>com.js.dataclean.distinctip.DistIPDriver</Main-Class>
</manifestEntries>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.hadoop.fs.FileSystem</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
map類
package com.js.dataclean.distinctip;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.json.JSONObject;
import java.io.IOException;
public class DistIPMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
JSONObject jsonObject = new JSONObject(str);
String decrypted_data = jsonObject.getString("decrypted_data");
String time = jsonObject.getString("time").split(" ")[0];
String ip = jsonObject.getString("remote_addr");
String flag = "";
if(decrypted_data.contains("userAgent")){
flag = "userAgent";
}
if(decrypted_data.contains("success")){
flag = "success";
}
context.write(new Text(time + "\t" + flag + "\t" + ip),NullWritable.get());
}
}
reduce類
package com.js.dataclean.distinctip;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class DistIPReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
private MultipleOutputs<Text,NullWritable> mos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs(context);
}
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String[] arr = key.toString().split("\t");
String time = arr[0];
String flag = arr[1];
String ip = arr[2];
// 設定輸出路徑
String name = time + "/" + flag + "/part";
mos.write(new Text(ip),NullWritable.get(),name);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
自定義分割槽類
package com.js.dataclean.distinctip;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class DistIPPartition extends Partitioner<Text,NullWritable> {
@Override
public int getPartition(Text text, NullWritable nullWritable, int numPartitions) {
int flag = 2;
if (text.toString().contains("success")){
flag = 0;
}
if(text.toString().contains("userAgent")){
flag = 1;
}
return flag;
}
}
driver類
package com.js.dataclean.distinctip;
import com.js.dataclean.utils.HdfsUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class DistIPDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
if(args.length != 2){
System.exit(1);
}
int etc = ToolRunner.run(new Configuration(),new DistIPDriver(),args);
System.exit(etc);
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(DistIPDriver.class);
job.setMapperClass(DistIPMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(DistIPReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 設定分割槽函式及分割槽數
job.setPartitionerClass(DistIPPartition.class);
job.setNumReduceTasks(3);
// 輸入輸出路徑
String inpath = strings[0];
String output = strings[1];
// 輸入資料小檔案合併
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMinInputSplitSize(job,656932864); // 一個map最少處理64M檔案
CombineTextInputFormat.setMaxInputSplitSize(job,new Long(656932864)); // 最多處理128M檔案
// 讓輸入路徑可以遞迴
FileInputFormat.setInputDirRecursive(job,true);
FileInputFormat.setInputPaths(job,inpath);
if(HdfsUtil.existsFiles(conf,output)){
HdfsUtil.deleteFolder(conf,output);
}
// 輸出路徑
FileOutputFormat.setOutputPath(job,new Path(output));
LazyOutputFormat.setOutputFormatClass(job,TextOutputFormat.class);
return job.waitForCompletion(true)?0:1;
}
}