Hbase通過BulkLoad快速匯入資料
阿新 • • 發佈:2018-11-15
HBase是一個分散式的、面向列的開源資料庫,它可以讓我們隨機的、實時的訪問大資料。大量的資料匯入到Hbase中時速度回很慢,不過我們可以使用bulkload來匯入。
BulkLoad的過程主要有以下部分:
1. 從資料來源提取資料並上傳到HDFS中。
2. 使用MapReduce作業準備資料
這一步需要一個MapReduce作業,並且大多數情況下還需要我們自己編寫Map函式,而Reduce函式不需要我們考慮,由HBase提供。該作業需要使用rowkey(行鍵)作為輸出Key,KeyValue、Put或者Delete作為輸出Value。MapReduce作業需要使用HFileOutputFormat2來生成HBase資料檔案。為了有效的匯入資料,需要配置HFileOutputFormat2使得每一個輸出檔案都在一個合適的區域中。為了達到這個目的,MapReduce作業會使用Hadoop的TotalOrderPartitioner類根據表的key值將輸出分割開來。HFileOutputFormat2的方法configureIncrementalLoad()會自動的完成上面的工作。
3. 告訴RegionServers資料的位置並匯入資料
這一步是最簡單的,通常需要使用LoadIncrementalHFiles(更為人所熟知是completebulkload工具),將檔案在HDFS上的位置傳遞給它,它就會利用RegionServer將資料匯入到相應的區域。
程式碼如下:
package com.test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.PutSortReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class BulkLoadMR { public static class BulkMap extends Mapper<Object, Text, ImmutableBytesWritable, Put>{ public static final String SP = "\t"; @Override protected void map(Object key,Text value,Context context) throws IOException, InterruptedException { String[] values = value.toString().split(SP); if(values.length == 2){ byte[] rowKey = Bytes.toBytes(values[0]); byte[] columnValue = Bytes.toBytes(values[1]); byte[] family = Bytes.toBytes("d"); byte[] columnName = Bytes.toBytes("c"); ImmutableBytesWritable rowKwyWritable = new ImmutableBytesWritable(rowKey); Put put = new Put(rowKey); put.add(family, columnName, columnValue); context.write(rowKwyWritable, put); } } } public static void main(String[] args) throws Exception { String dst = args[0]; String out = args[1]; int splitMB = Integer.parseInt(args[2]); String tableName = args[3]; Configuration conf = new Configuration(); conf.set("mapreduce.input.fileinput.split.maxsize", String.valueOf(splitMB * 1024 *1024)); conf.set("mapred.min.split.size", String.valueOf(splitMB * 1024 *1024)); conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(splitMB * 1024 *1024)); conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(splitMB * 1024 *1024)); Job job = new Job(conf,"BulkLoad"); job.setJarByClass(BulkLoadMR.class); job.setMapperClass(BulkMap.class); job.setReducerClass(PutSortReducer.class); job.setOutputFormatClass(HFileOutputFormat2.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); FileInputFormat.addInputPath(job, new Path(dst)); FileOutputFormat.setOutputPath(job, new Path(out)); Configuration hbaseconf = HBaseConfiguration.create(); HTable table = new HTable(hbaseconf, tableName); HFileOutputFormat2.configureIncrementalLoad(job, table); job.waitForCompletion(true); //將生成的入庫 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseconf); loader.doBulkLoad(new Path(out), table); } }
注意:
map的輸出部分key和value的型別必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>