1. 程式人生 > >Hadoop整合Hbase,處理完的資料直接存入Hbase中

Hadoop整合Hbase,處理完的資料直接存入Hbase中

Hadoop可以清洗計算TB級別的資料,資料清洗結束存入HDFS中,也可以存入到Hbase中,可以方便快速查詢;

1.Hbase中需要建立一張表用來儲存HDFS清洗後的資料:

hbase(main):014:0> create_namespace 'hdfs'    //構建表空間
0 row(s) in 0.9030 seconds

hbase(main):015:0> create 'hdfs:product','info'  //表空間下構建表
0 row(s) in 1.2830 seconds

=> Hbase::Table - hdfs:product

hbase(main):016:0> desc 'hdfs:product'
Table hdfs:product is ENABLED                                                                                                                                                                
hdfs:product                                                                                                                                                                                 
COLUMN FAMILIES DESCRIPTION                                                                                                                                                                  
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERS
IONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                                                                           
1 row(s) in 0.1070 seconds

2.準備資料,資料oracle兩種表訂單表和商品表,需求是:求每年每種商品的銷售總額,要求顯示:商品名稱、年份、每年銷售總額,存入到Hbase中

3.構建Hadoop程式

Mapper端是Hadoop用來清洗資料,以及相同key的資料彙總

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class ProductMapper extends Mapper<LongWritable,Text, IntWritable, Text> {
    //2張表需要獲取表的名稱
    String fileName;
    Text v = new Text();
    IntWritable k = new IntWritable();

    protected void setup(Context context) throws IOException, InterruptedException {
        //初始化獲取表的名稱
        FileSplit split = (FileSplit)context.getInputSplit();
        fileName = split.getPath().getName();

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取檔名稱 根據名稱輸出 不同的內容 區分開商品名字以及訂單銷售額
        String data = value.toString();
        String []words = data.split(",");
        if(fileName.equals("sales")){
            k.set(Integer.parseInt(words[0]));
            v.set(words[2].substring(0,4)+":"+words[6]);
            context.write(k,v);
        }else{
            k.set(Integer.parseInt(words[0]));
            v.set("*"+words[1]);
            context.write(k,v);
        }
    }
}

2.整合hbase Reducer端用TableReducer,處理資料插入表中;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.HashMap;


public class HbaseDemo extends TableReducer<IntWritable, Text, NullWritable> {
    //構建Hbase 輸入 與輸出
    //將hbase的 資料輸出
    //直接構建Reduce方法
    HashMap<Integer, Double> result =  new HashMap<Integer, Double>();
    String name;
    int year;
    double amount_sold;


    @Override
    protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
       Put put = new Put(Bytes.toBytes(key.toString())); //指定行鍵為商品ID 
        for (Text v:values){
            String data = v.toString();
            int a = data.indexOf("*");
            if(a>=0){
                name = data.substring(1);
            }else{
                int b = data.indexOf(":");
                year = Integer.parseInt(data.substring(0,b));
                amount_sold = Double.parseDouble(data.substring(b + 1));
                //將 年份 以及每年的銷售額加入到result中
                if(result.containsKey(year)){
                    Double mount = result.get(year);
                    result.put(year,(amount_sold+mount));
                }else {
                    result.put(year, amount_sold);
                }
            }
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(String.valueOf(name))); //列族 prod_name
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("year_id"),Bytes.toBytes(String.valueOf(year)));//列族 time_id 年份
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("amount_sold"),Bytes.toBytes(String.valueOf(result.get(year)))); //列族 銷售額
        }

        //輸出最終結果
        context.write(NullWritable.get(),put);

    }
}

3.構建Job 主程式

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ProductMain {
    public static void main(String[] args) throws Exception{
        //構建
        args = new String[]{"D:\\tmp\\prod"};

        Configuration conf = new Configuration();
        //Zookeeper地址 配置資訊
        conf.set("hbase.zookeeper.quorum", "192.168.128.111");
        conf.set(TableOutputFormat.OUTPUT_TABLE,"hdfs:product"); //指定輸出型別是表  以及表的名稱
        Job job = Job.getInstance(conf);
        TableMapReduceUtil.addDependencyJars(job);   //設定任務依賴關係  Hbase依賴Hadoop

        //構建任務的輸入輸出
        job.setJarByClass(ProductMain.class);
        job.setMapperClass(ProductMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        //構建reducer  不再構建Reducer的輸出資料
        job.setReducerClass(HbaseDemo.class);


        //構建輸入輸出路徑 
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //輸出到一張表裡面
        job.setOutputFormatClass(TableOutputFormat.class);

        //啟動任務
        job.waitForCompletion(true);
    }
}

4.檢視最後輸出結構:一個Region 有三個列:銷售總額,年份,商品名稱,RowKey則是商品ID;

hbase(main):002:0> desc 'hdfs:product'
Table hdfs:product is ENABLED                                                                                                                                                                
hdfs:product                                                                                                                                                                                 
COLUMN FAMILIES DESCRIPTION                                                                                                                                                                  
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERS
IONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                                                                           
1 row(s) in 0.2440 seconds

hbase(main):003:0> scan 'hdfs:product'
ROW                                              COLUMN+CELL                                                                                                                                 
 113                                             column=info:amount_sold, timestamp=1538625381681, value=2.069043351997921E7                                                                 
 113                                             column=info:name, timestamp=1538625381681, value=CD-R Mini Discs                                                                            
 113                                             column=info:year_id, timestamp=1538625381681, value=1998                                                                                    
 114                                             column=info:amount_sold, timestamp=1538625381815, value=2.0313520240019318E7                                                                
 114                                             column=info:name, timestamp=1538625381815, value=Music CD-R                                                                                 
 114                                             column=info:year_id, timestamp=1538625381815, value=2000                                                                                    
 115                                             column=info:amount_sold, timestamp=1538625381926, value=1.883235699001709E7                                                                 
 115                                             column=info:name, timestamp=1538625381926, value=CD-RW                                                                                      
 115                                             column=info:year_id, timestamp=1538625381926, value=1999                                                                                    
 116                                             column=info:amount_sold, timestamp=1538625382035, value=2.041153556001948E7                                                                 
 116                                             column=info:name, timestamp=1538625382035, value=CD-RW                                                                                      
 116                                             column=info:year_id, timestamp=1538625382035, value=2000                                                                                    
 117                                             column=info:amount_sold, timestamp=1538625382179, value=2.3968519569992796E7                                                                
 117                                             column=info:name, timestamp=1538625382179, value=CD-R                                                                                       
 117                                             column=info:year_id, timestamp=1538625382179, value=2001                                                                                    
 118                                             column=info:amount_sold, timestamp=1538625382323, value=1.9332802020017046E7                                                                
 118                                             column=info:name, timestamp=1538625382323, value=OraMusic CD-R                                                                              
 118                                             column=info:year_id, timestamp=1538625382323, value=1999                                                                                    
 119                                             column=info:amount_sold, timestamp=1538625382478, value=2.1135087109982926E7                                                                
 119                                             column=info:name, timestamp=1538625382478, value=CD-R with Jewel Cases                                                                      
 119                                             column=info:year_id, timestamp=1538625382478, value=1998                                                                                    
 120                                             column=info:amount_sold, timestamp=1538625382690, value=2.077849573001311E7                                                                 
 120                                             column=info:name, timestamp=1538625382690, value=DVD-R Disc with Jewel Case                                                                 
 120                                             column=info:year_id, timestamp=1538625382690, value=2000                                                                                    
 121                                             column=info:amount_sold, timestamp=1538625383171, value=1.946866801001969E7                                                                 
 121                                             column=info:name, timestamp=1538625383171, value=DVD-RAM Jewel Case                                                                         
 121                                             column=info:year_id, timestamp=1538625383171, value=1999                                                                                    
 122                                             column=info:amount_sold, timestamp=1538625383269, value=2.4405262879992317E7                                                                
 122                                             column=info:name, timestamp=1538625383269, value=DVD-R Discs                                                                                
 122                                             column=info:year_id, timestamp=1538625383269, value=2001                                                                                    
 123                                             column=info:amount_sold, timestamp=1538625383269, value=1.9679875930018328E7                                                                
 123                                             column=info:name, timestamp=1538625383269, value=DVD-R Discs                                                                                
 123                                             column=info:year_id, timestamp=1538625383269, value=1999                                                                                    
 124                                             column=info:amount_sold, timestamp=1538625383417, value=2.4786349059991E7                                                                   
 124                                             column=info:name, timestamp=1538625383417, value=DVD-RW Discs                                                                               
 124                                             column=info:year_id, timestamp=1538625383417, value=2001                                                                                    
 125                                             column=info:amount_sold, timestamp=1538625383566, value=2.1444125459976837E7                                                                
 125                                             column=info:name, timestamp=1538625383566, value=3 1/2" Bulk diskettes                                                                      
 125                                             column=info:year_id, timestamp=1538625383566, value=1998                                                                                    

資料來源是:oracle 中sh使用者下的 銷售表sales 以及商品表 products

SQL> select count(*) from sales;

  COUNT(*)
----------
    918843
SQL> select count(*) from PRODUCTS;

  COUNT(*)
----------
	72

小結:hadoop 整合hbase 就是利用了hadoop Mapper端以及Hbase中的Reducer,我之前指定RowKey的時候重複使用Key(Mapper的輸出Key),導致程式存在空指標異常,後來將Reducer中行鍵設定為空,在構建Put的時候指定rowKey(要保證唯一性)Mapper端的Key,Main方法需要注意Reducer 無輸出,需要指定job.setOutputFormatClass(TableOutputFormat.class);也就是要輸出到一張表中,配置引數構建job的時候需要指定表名稱,conf.set(TableOutputFormat.OUTPUT_TABLE,"hdfs:product"); (事先存在這張表),其他就是Zookeeper的配置引數;