Hadoop整合Hbase,處理完的資料直接存入Hbase中
阿新 • • 發佈:2018-12-13
Hadoop可以清洗計算TB級別的資料,資料清洗結束存入HDFS中,也可以存入到Hbase中,可以方便快速查詢;
1.Hbase中需要建立一張表用來儲存HDFS清洗後的資料:
hbase(main):014:0> create_namespace 'hdfs' //構建表空間 0 row(s) in 0.9030 seconds hbase(main):015:0> create 'hdfs:product','info' //表空間下構建表 0 row(s) in 1.2830 seconds => Hbase::Table - hdfs:product hbase(main):016:0> desc 'hdfs:product' Table hdfs:product is ENABLED hdfs:product COLUMN FAMILIES DESCRIPTION {NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERS IONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} 1 row(s) in 0.1070 seconds
2.準備資料,資料oracle兩種表訂單表和商品表,需求是:求每年每種商品的銷售總額,要求顯示:商品名稱、年份、每年銷售總額,存入到Hbase中
3.構建Hadoop程式
Mapper端是Hadoop用來清洗資料,以及相同key的資料彙總
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class ProductMapper extends Mapper<LongWritable,Text, IntWritable, Text> { //2張表需要獲取表的名稱 String fileName; Text v = new Text(); IntWritable k = new IntWritable(); protected void setup(Context context) throws IOException, InterruptedException { //初始化獲取表的名稱 FileSplit split = (FileSplit)context.getInputSplit(); fileName = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲取檔名稱 根據名稱輸出 不同的內容 區分開商品名字以及訂單銷售額 String data = value.toString(); String []words = data.split(","); if(fileName.equals("sales")){ k.set(Integer.parseInt(words[0])); v.set(words[2].substring(0,4)+":"+words[6]); context.write(k,v); }else{ k.set(Integer.parseInt(words[0])); v.set("*"+words[1]); context.write(k,v); } } }
2.整合hbase Reducer端用TableReducer,處理資料插入表中;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import java.io.IOException; import java.util.HashMap; public class HbaseDemo extends TableReducer<IntWritable, Text, NullWritable> { //構建Hbase 輸入 與輸出 //將hbase的 資料輸出 //直接構建Reduce方法 HashMap<Integer, Double> result = new HashMap<Integer, Double>(); String name; int year; double amount_sold; @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Put put = new Put(Bytes.toBytes(key.toString())); //指定行鍵為商品ID for (Text v:values){ String data = v.toString(); int a = data.indexOf("*"); if(a>=0){ name = data.substring(1); }else{ int b = data.indexOf(":"); year = Integer.parseInt(data.substring(0,b)); amount_sold = Double.parseDouble(data.substring(b + 1)); //將 年份 以及每年的銷售額加入到result中 if(result.containsKey(year)){ Double mount = result.get(year); result.put(year,(amount_sold+mount)); }else { result.put(year, amount_sold); } } put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(String.valueOf(name))); //列族 prod_name put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("year_id"),Bytes.toBytes(String.valueOf(year)));//列族 time_id 年份 put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("amount_sold"),Bytes.toBytes(String.valueOf(result.get(year)))); //列族 銷售額 } //輸出最終結果 context.write(NullWritable.get(),put); } }
3.構建Job 主程式
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ProductMain {
public static void main(String[] args) throws Exception{
//構建
args = new String[]{"D:\\tmp\\prod"};
Configuration conf = new Configuration();
//Zookeeper地址 配置資訊
conf.set("hbase.zookeeper.quorum", "192.168.128.111");
conf.set(TableOutputFormat.OUTPUT_TABLE,"hdfs:product"); //指定輸出型別是表 以及表的名稱
Job job = Job.getInstance(conf);
TableMapReduceUtil.addDependencyJars(job); //設定任務依賴關係 Hbase依賴Hadoop
//構建任務的輸入輸出
job.setJarByClass(ProductMain.class);
job.setMapperClass(ProductMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
//構建reducer 不再構建Reducer的輸出資料
job.setReducerClass(HbaseDemo.class);
//構建輸入輸出路徑
FileInputFormat.setInputPaths(job,new Path(args[0]));
//輸出到一張表裡面
job.setOutputFormatClass(TableOutputFormat.class);
//啟動任務
job.waitForCompletion(true);
}
}
4.檢視最後輸出結構:一個Region 有三個列:銷售總額,年份,商品名稱,RowKey則是商品ID;
hbase(main):002:0> desc 'hdfs:product'
Table hdfs:product is ENABLED
hdfs:product
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERS
IONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
1 row(s) in 0.2440 seconds
hbase(main):003:0> scan 'hdfs:product'
ROW COLUMN+CELL
113 column=info:amount_sold, timestamp=1538625381681, value=2.069043351997921E7
113 column=info:name, timestamp=1538625381681, value=CD-R Mini Discs
113 column=info:year_id, timestamp=1538625381681, value=1998
114 column=info:amount_sold, timestamp=1538625381815, value=2.0313520240019318E7
114 column=info:name, timestamp=1538625381815, value=Music CD-R
114 column=info:year_id, timestamp=1538625381815, value=2000
115 column=info:amount_sold, timestamp=1538625381926, value=1.883235699001709E7
115 column=info:name, timestamp=1538625381926, value=CD-RW
115 column=info:year_id, timestamp=1538625381926, value=1999
116 column=info:amount_sold, timestamp=1538625382035, value=2.041153556001948E7
116 column=info:name, timestamp=1538625382035, value=CD-RW
116 column=info:year_id, timestamp=1538625382035, value=2000
117 column=info:amount_sold, timestamp=1538625382179, value=2.3968519569992796E7
117 column=info:name, timestamp=1538625382179, value=CD-R
117 column=info:year_id, timestamp=1538625382179, value=2001
118 column=info:amount_sold, timestamp=1538625382323, value=1.9332802020017046E7
118 column=info:name, timestamp=1538625382323, value=OraMusic CD-R
118 column=info:year_id, timestamp=1538625382323, value=1999
119 column=info:amount_sold, timestamp=1538625382478, value=2.1135087109982926E7
119 column=info:name, timestamp=1538625382478, value=CD-R with Jewel Cases
119 column=info:year_id, timestamp=1538625382478, value=1998
120 column=info:amount_sold, timestamp=1538625382690, value=2.077849573001311E7
120 column=info:name, timestamp=1538625382690, value=DVD-R Disc with Jewel Case
120 column=info:year_id, timestamp=1538625382690, value=2000
121 column=info:amount_sold, timestamp=1538625383171, value=1.946866801001969E7
121 column=info:name, timestamp=1538625383171, value=DVD-RAM Jewel Case
121 column=info:year_id, timestamp=1538625383171, value=1999
122 column=info:amount_sold, timestamp=1538625383269, value=2.4405262879992317E7
122 column=info:name, timestamp=1538625383269, value=DVD-R Discs
122 column=info:year_id, timestamp=1538625383269, value=2001
123 column=info:amount_sold, timestamp=1538625383269, value=1.9679875930018328E7
123 column=info:name, timestamp=1538625383269, value=DVD-R Discs
123 column=info:year_id, timestamp=1538625383269, value=1999
124 column=info:amount_sold, timestamp=1538625383417, value=2.4786349059991E7
124 column=info:name, timestamp=1538625383417, value=DVD-RW Discs
124 column=info:year_id, timestamp=1538625383417, value=2001
125 column=info:amount_sold, timestamp=1538625383566, value=2.1444125459976837E7
125 column=info:name, timestamp=1538625383566, value=3 1/2" Bulk diskettes
125 column=info:year_id, timestamp=1538625383566, value=1998
資料來源是:oracle 中sh使用者下的 銷售表sales 以及商品表 products
SQL> select count(*) from sales;
COUNT(*)
----------
918843
SQL> select count(*) from PRODUCTS;
COUNT(*)
----------
72
小結:hadoop 整合hbase 就是利用了hadoop Mapper端以及Hbase中的Reducer,我之前指定RowKey的時候重複使用Key(Mapper的輸出Key),導致程式存在空指標異常,後來將Reducer中行鍵設定為空,在構建Put的時候指定rowKey(要保證唯一性)Mapper端的Key,Main方法需要注意Reducer 無輸出,需要指定job.setOutputFormatClass(TableOutputFormat.class);也就是要輸出到一張表中,配置引數構建job的時候需要指定表名稱,conf.set(TableOutputFormat.OUTPUT_TABLE,"hdfs:product"); (事先存在這張表),其他就是Zookeeper的配置引數;