1. 程式人生 > >基於HBase的MapReduce實現大量郵件信息統計分析

基於HBase的MapReduce實現大量郵件信息統計分析

inittab 寫入 img implement system return dea 比較 tco

一:概述

在大多數情況下,如果使用MapReduce進行batch處理,文件一般是存儲在HDFS上的,但這裏有個很重要的場景不能忽視,那就是對於大量的小文件的處理(此處小文件沒有確切的定義,一般指文件大小比較小,比如5M以內的文件),而HDFS的文件塊一般是64M,這將會影響到HDFS的性能,因為小文件過多,那麽NameNode需要保存的文件元信息將占用更多的空間,加大NameNode的負載進而影響性能,假如對於每個文件,在NameNode中保存的元數據大小是100字節,那麽1千萬這樣的小文件,將占用10億byte,約1G的內存空間,目前有以下幾種對於眾多小文件的處理方法:

HAR File方式,將小文件合並成大文件

SequenceFile方式,以文件名為key,文件內容為value,生成一個序列文件

以HBase作為小文件的存儲,rowkey使用文件名,列族單元保存文件內容,文件後綴名等信息

在本文中的案例,就是采用第三種方法

二:實現

1:郵件格式如下,為了簡單起見及安全性,這裏作了簡化,每一封這樣的郵件,大小將近15k左右

技術分享圖片

2:HBase表

需要創建2張HBase表格,一張保存郵件文件,另外一張保存MapReduce的輸出結果,在hbase shell中分別創建:

create ‘email‘, {NAME=>‘f1‘, VERSIONS=>2}
create ‘summary‘, {NAME=>‘f1‘, VERSIONS=>2}

3:郵件文件導入到HBase中

請參考上篇文章 將文件以API方式導入到HBase(小文件處理),此處,假設是把所有的每天已經生產的郵件文件,從本地導入到HBase中,另外一種方案是創建一個獨立的RESTful API,供第三方程序調用,將郵件信息寫入到HBase中

4:MapReduce

  • 在IDEA中創建Maven工程

技術分享圖片

  • 修改pom.xml文件,添加依賴hbase-client及hbase-server
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>mronhbase</groupId> <artifactId>mronhbase</artifactId> <version>1.0-SNAPSHOT</version> <repositories> <repository> <id>apache</id> <url>http://maven.apache.org</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.6</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>jiecxy.App</mainClass> </transformer> </transformers> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
  • 創建java類,先引入必要的包:
package examples;

import java.io.IOException;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.Job;
  • 類文件hbasemr,其中包含主程序入口
public class hbasemr {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
    {
        String hbaseTableName1 = "email";
        String hbaseTableName2 = "summary";

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        job.setJarByClass(hbasemr.class);
        job.setJobName("mronhbase");

        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);

        TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, MyMapper.class, Text.class, IntWritable.class, job);
        TableMapReduceUtil.initTableReducerJob(hbaseTableName2, MyReducer.class, job);
        System.exit(job.waitForCompletion(true) ? 1 : 0);
    }
    
    public static String getSubString(String value,String rgex){
        Pattern pattern = Pattern.compile(rgex);
        Matcher m = pattern.matcher(value);
        while(m.find()){
            return m.group(1);
        }
        return "";
    }
}

  • 添加Mapper類及Reducer類:

Hbases實現了TableMapper類及TableReducer類,

創建MyMapper

   public static class MyMapper extends TableMapper<Text, IntWritable>{
        private final static IntWritable one = new IntWritable(1);

        public void map(ImmutableBytesWritable key, Result value, Context context)
                throws IOException,InterruptedException
        {
            String rowValue = Bytes.toString(value.getValue("f1".getBytes(),"message".getBytes()));
            if(rowValue !=null) {

                String rgex = "Project Name:(.*?)\\r\\n";
                String temp = hbasemr.getSubString(rowValue,rgex);
                String username = temp.substring(0, temp.indexOf(‘_‘));

                rgex = "Accounts:(.*?)\\r\\n";
                String count = hbasemr.getSubString(rowValue,rgex).trim();

                IntWritable intCount = new IntWritable(Integer.parseInt(count));
                context.write(new Text(username), intCount);
            }
        }
    }

創建MyReducer:

    public static class MyReducer extends TableReducer<Text,IntWritable, NullWritable>
    {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            Iterator<IntWritable> item = values.iterator();
            while (item.hasNext()) {
                sum += item.next().get();
            }
            this.result.set(sum);

            Put put = new Put(key.toString().getBytes());
            put.addColumn("f1".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
            context.write(NullWritable.get(), put);
        }
    }

5:運行與調試

啟動Hadoop及HBase,在IDEA 中對MyMapper類及MyReducer類設置好斷點,以調試方式運行程序,運行完後,進入到hbase shell查看運行結果

技術分享圖片

基於HBase的MapReduce實現大量郵件信息統計分析