使用MapReduce對Hadoop下的日誌記錄進行分析處理
一. 簡介
MapReduce是一個高效能的批處理分散式計算框架,用於對海量資料進行並行分析和處理。與傳統方法相比較,MapReduce更傾向於蠻力去解決問題,通過簡單、粗暴、有效的方式去處理海量的資料。通過對資料的輸入、拆分與組合(核心),將任務分配到多個節點伺服器上,進行分散式計算,這樣可以有效地提高資料管理的安全性,同時也能夠很好地範圍被管理的資料。
mapreduce核心就是map+shuffle+reducer,首先通過讀取檔案,進行分片,通過map獲取檔案的key-value對映關係,用作reducer的輸入,在作為reducer輸入之前,要先對map的key進行一個shuffle,也就是排個序,然後將排完序的key-value作為reducer的輸入進行reduce操作,當然一個mapreduce任務可以不要有reduce,只用一個map
其實現在MapReduce已經被Spark取代了,不過作為對大資料的學習,還是要稍微瞭解一下,下面是我學習過程中看過和寫過的例子。
二. Hadoop自帶的WordCount
2.1. 建立一個Maven專案,目錄結構如下:
2.2. pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sun</groupId> <artifactId>hadoop-MapReduce</artifactId> <version>1.0-SNAPSHOT</version> <properties> <hadoopVersion>2.6.0</hadoopVersion> </properties> <dependencies> <!-- Hadoop start --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoopVersion}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoopVersion}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoopVersion}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoopVersion}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-it</artifactId> <version>1.1.2</version> </dependency> <!-- Hadoop --> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>org.jetbrains</groupId> <artifactId>annotations-java5</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> </dependencies> </project>
2.3. WordCount.java
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } this.result.set(sum); context.write(key, this.result); } } public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private static final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } } }
三. 我的例子:
3.1. 將WordCount的結果上傳到HBase:
WordCountUpLoadToHBase.java:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
public class WordCountUpLoadToHBase extends Configured {
public static class WCHBaseMapper extends Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
StringTokenizer strs = new StringTokenizer(value.toString());
while(strs.hasMoreTokens()){
word.set(strs.nextToken());
context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one);
}
}
}
public static class WCHBaseReducer extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>{
public void reduce(ImmutableBytesWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
int sum = 0;
for(IntWritable val:values){
sum += val.get();
}
Put put = new Put(key.get());
put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(sum+""));
context.write(key, put);
}
}
@SuppressWarnings("all")
public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
String tableName = "wordcount";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","hadoop");
conf.set("hbase.zookeeper.property.clientPort","2181");
HBaseAdmin admin = new HBaseAdmin(conf);
//如果表格存在就刪除
if(admin.tableExists(tableName)){
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnDescriptor =new HColumnDescriptor("content");
tableDescriptor.addFamily(columnDescriptor);
admin.createTable(tableDescriptor);
Job job = new Job(conf,"upload to hbase");
job.setJarByClass(WordCountUpLoadToHBase.class);
job.setMapperClass(WCHBaseMapper.class);
TableMapReduceUtil.initTableReducerJob(tableName, WCHBaseReducer.class, job,null,null,null,null,false);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPaths(job, "hdfs://hadoop:9000/agentlog/*");
System.exit(job.waitForCompletion(true)?0:1);
}
}
3.2. 從HBase讀取資料
MRReadFromHbase.java:
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRReadFromHbase extends Configured {
public static class WCHBaseMapper extends TableMapper<Text, Text>{
@Override
public void map(ImmutableBytesWritable key,Result values,Context context) throws IOException, InterruptedException{
StringBuffer sb =new StringBuffer("");
for(Map.Entry<byte[], byte[]> value:values.getFamilyMap("content".getBytes()).entrySet()){
String str =new String(value.getValue());
if(str!=null){
sb.append(str);
}
context.write(new Text(key.get()), new Text(sb.toString()));
}
}
}
public static class WCHBaseReducer extends Reducer<Text, Text, Text, Text>{
private Text result =new Text();
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
for(Text val:values){
result.set(val);
context.write(key,result);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
String tableName = "wordcount";
Configuration conf =HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Job job =new Job(conf,"read from hbase to hdfs");
job.setJarByClass(MRReadFromHbase.class);
job.setReducerClass(WCHBaseReducer.class);
TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WCHBaseMapper.class, Text.class, Text.class, job);
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop1:9000/user/sun/hbase"));
System.exit(job.waitForCompletion(true)?0:1);
}
}
3.3. 我的自定義格式的日誌資料處理:
日誌以[format:1][user:AAA][interface:/bt/btCourse/get][date:2018/10/10]的格式儲存,針對[format:1]開頭的資料,根據不同使用者user進行排序統計。
UserLog.java:
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
public class UserLog
{
public static void main(String[] args)
throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(UserLog.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
// map將輸入中的value複製到輸出資料的key上,並直接輸出
public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> {
private static Text line = new Text();// 每行資料
// 實現map函式
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().startsWith("[format:1]")) {
context.write(new Text(getParameter(value, "user") + "|" + getParameter(value, "time") + "|" + getParameter(value, "html")), new Text(""));
}
}
}
/* // reduce將輸入中的key複製到輸出資料的key上,並直接輸出
public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
// 實現reduce函式
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(key, val);
}
}
}*/
public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
//定義treeMap來保持統計結果,由於treeMap是按key升序排列的,這裡要人為指定Comparator以實現倒排
private TreeMap<String, String> treeMap = new TreeMap<String, String>(new Comparator<String>() {
//@Override
public int compare(String x, String y) {
return x.compareTo(y);
}
});
//定義treeMap來保持統計結果,由於treeMap是按key升序排列的,這裡要人為指定Comparator以實現倒排
private TreeMap<String, Long> treeMapResult = new TreeMap<String, Long>(new Comparator<String>() {
//@Override
public int compare(String x, String y) {
return x.compareTo(y);
}
});
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//reduce後的結果放入treeMap,而不是向context中記入結果
treeMap.put(key.toString(), key.toString());
}
protected void cleanup(Context context) throws IOException, InterruptedException {
if (StringUtils.isBlank(context.getCurrentValue().toString())) {
Iterator it = treeMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
context.write(new Text(entry.getKey().toString()), new Text("0"));
}
}else{
String key = "";
String value = "";
//將treeMap中的結果,按value-key順序寫入contex中
Iterator it = treeMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String[] sp = entry.getKey().toString().split("\\|");
if (key.equals(sp[0])){
long time = getSecondDiff(value, sp[1]);
if (!treeMapResult.containsKey(sp[0] + "|" + sp[2])) {
treeMapResult.put(sp[0] + "|" + sp[2], time);
} else {
treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong(treeMapResult.get(sp[0] + "|" + sp[2]).toString()) + time);
}
}else{
treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong("0"));
}
key = sp[0];
value = sp[1];
}
// 輸出
Iterator iter = treeMapResult.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
context.write(new Text(entry.getKey().toString()), new Text(entry.getValue().toString()));
}
}
}
}
private static String getParameter(Text value, String param){
try {
return value.toString().substring(value.toString().indexOf(param) + param.length() + 1, value.toString().indexOf("]", value.toString().indexOf(param) + param.length() + 1));
}catch(Exception e){
return "";
}
}
private static long getSecondDiff(String s1, String s2){
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date d1 = null;
Date d2 = null;
try {
d1 = format.parse(s1);
d2 = format.parse(s2);
//毫秒ms
long diff = d2.getTime() - d1.getTime();
long diffSeconds = diff / 1000;
return diffSeconds;
}catch(Exception e){
return 0;
}
}
}
四. 結果驗證:
4.1. 將專案打成Jar包後,放到CentOS上的/home/hadoop/Downloads目錄下。
4.2. 執行:
hadoop jar /home/hadoop/Downloads/hadoop-MapReduce-1.0-SNAPSHOT.jar org.apache.hadoop.examples.wordcount /agentlog/ /user/sun/MapReduce/wordCountX
4.3. 檢視結果:
hadoop fs -cat /user/sun/MapReduce/wordCountX/part-r-00000