1. 程式人生 > >Hbase通過BulkLoad快速匯入資料

Hbase通過BulkLoad快速匯入資料

HBase是一個分散式的、面向列的開源資料庫,它可以讓我們隨機的、實時的訪問大資料。大量的資料匯入到Hbase中時速度回很慢,不過我們可以使用bulkload來匯入。

BulkLoad的過程主要有以下部分:

1. 從資料來源提取資料並上傳到HDFS中。

2. 使用MapReduce作業準備資料

這一步需要一個MapReduce作業,並且大多數情況下還需要我們自己編寫Map函式,而Reduce函式不需要我們考慮,由HBase提供。該作業需要使用rowkey(行鍵)作為輸出Key,KeyValue、Put或者Delete作為輸出Value。MapReduce作業需要使用HFileOutputFormat2來生成HBase資料檔案。為了有效的匯入資料,需要配置HFileOutputFormat2使得每一個輸出檔案都在一個合適的區域中。為了達到這個目的,MapReduce作業會使用Hadoop的TotalOrderPartitioner類根據表的key值將輸出分割開來。HFileOutputFormat2的方法configureIncrementalLoad()會自動的完成上面的工作。

3. 告訴RegionServers資料的位置並匯入資料

這一步是最簡單的,通常需要使用LoadIncrementalHFiles(更為人所熟知是completebulkload工具),將檔案在HDFS上的位置傳遞給它,它就會利用RegionServer將資料匯入到相應的區域。

程式碼如下:

package com.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class BulkLoadMR {

	public static class BulkMap extends Mapper<Object, Text, ImmutableBytesWritable, Put>{
		public static final String SP = "\t";
		@Override
		protected void map(Object key,Text value,Context context)
				throws IOException, InterruptedException {
			String[] values = value.toString().split(SP);
			if(values.length == 2){
				byte[] rowKey = Bytes.toBytes(values[0]);
				byte[] columnValue = Bytes.toBytes(values[1]);
				byte[] family = Bytes.toBytes("d");
				byte[] columnName = Bytes.toBytes("c");
				
				ImmutableBytesWritable rowKwyWritable = new ImmutableBytesWritable(rowKey);
				Put put = new Put(rowKey);
				put.add(family, columnName, columnValue);
				context.write(rowKwyWritable, put);
				
			}
			
		}
	}
	
	
	public static void main(String[] args) throws Exception {
		String dst = args[0];
		String out = args[1];
		int splitMB = Integer.parseInt(args[2]);
		String tableName = args[3];
		
		Configuration conf = new Configuration();
		conf.set("mapreduce.input.fileinput.split.maxsize", String.valueOf(splitMB * 1024 *1024));
		conf.set("mapred.min.split.size", String.valueOf(splitMB * 1024 *1024));
		conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(splitMB * 1024 *1024));
		conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(splitMB * 1024 *1024));
		
		Job job = new Job(conf,"BulkLoad");
		job.setJarByClass(BulkLoadMR.class);
		job.setMapperClass(BulkMap.class);
		job.setReducerClass(PutSortReducer.class);
		job.setOutputFormatClass(HFileOutputFormat2.class);
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);
		
		FileInputFormat.addInputPath(job, new Path(dst));
		FileOutputFormat.setOutputPath(job, new Path(out));
		
		Configuration hbaseconf = HBaseConfiguration.create();
		HTable table = new HTable(hbaseconf, tableName);
		HFileOutputFormat2.configureIncrementalLoad(job, table);
		
		job.waitForCompletion(true);
		
		//將生成的入庫
		LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseconf);
		loader.doBulkLoad(new Path(out), table);

	}

}


注意:

map的輸出部分key和value的型別必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>