HBase 寫優化之 BulkLoad 實現資料快速入庫
1、為何要 BulkLoad 匯入?傳統的 HTableOutputFormat 寫 HBase 有什麼問題?
我們先看下 HBase 的寫流程:
通常 MapReduce 在寫HBase時使用的是 TableOutputFormat 方式,在reduce中直接生成put物件寫入HBase,該方式在大資料量寫入時效率低下(HBase會block寫入,頻繁進行flush,split,compact等大量IO操作),並對HBase節點的穩定性造成一定的影響(GC時間過長,響應變慢,導致節點超時退出,並引起一系列連鎖反應),而HBase支援 bulk load 的入庫方式,它是利用hbase的資料資訊按照特定格式儲存在hdfs內這一原理,直接在HDFS中生成持久化的HFile資料格式檔案,然後上傳至合適位置,即完成巨量資料快速入庫的辦法。配合mapreduce完成,高效便捷,而且不佔用region資源,增添負載,在大資料量寫入時能極大的提高寫入效率,並降低對HBase節點的寫入壓力。
通過使用先生成HFile,然後再BulkLoad到Hbase的方式來替代之前直接呼叫HTableOutputFormat的方法有如下的好處:
(1)消除了對HBase叢集的插入壓力
(2)提高了Job的執行速度,降低了Job的執行時間
目前此種方式僅僅適用於只有一個列族的情況,在新版 HBase 中,單列族的限制會消除。
2、bulkload 流程與實踐
bulkload 方式需要兩個Job配合完成:(1)第一個Job還是執行原來業務處理邏輯,處理的結果不直接呼叫HTableOutputFormat寫入到HBase,而是先寫入到HDFS上的一箇中間目錄下(如 middata)
(2)第二個Job以第一個Job的輸出(middata)做為輸入,然後將其格式化HBase的底層儲存檔案HFile
(3)呼叫BulkLoad將第二個Job生成的HFile匯入到對應的HBase表中
下面給出相應的範例程式碼:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class GeneratePutHFileAndBulkLoadToHBase { public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text wordText=new Text(); private IntWritable one=new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String line=value.toString(); String[] wordArray=line.split(" "); for(String word:wordArray) { wordText.set(word); context.write(wordText, one); } } } public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result=new IntWritable(); protected void reduce(Text key, Iterable<IntWritable> valueList, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sum=0; for(IntWritable value:valueList) { sum+=value.get(); } result.set(sum); context.write(key, result); } } public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String wordCountStr=value.toString(); String[] wordCountArray=wordCountStr.split("\t"); String word=wordCountArray[0]; int count=Integer.valueOf(wordCountArray[1]); //建立HBase中的RowKey byte[] rowKey=Bytes.toBytes(word); ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey); byte[] family=Bytes.toBytes("cf"); byte[] qualifier=Bytes.toBytes("count"); byte[] hbaseValue=Bytes.toBytes(count); // Put 用於列簇下的多列提交,若只有一個列,則可以使用 KeyValue 格式 // KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue); Put put=new Put(rowKey); put.add(family, qualifier, hbaseValue); context.write(rowKeyWritable, put); } } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration hadoopConfiguration=new Configuration(); String[] dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs(); //第一個Job就是普通MR,輸出到指定的目錄 Job job=new Job(hadoopConfiguration, "wordCountJob"); job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(dfsArgs[0])); FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1])); //提交第一個Job int wordCountJobResult=job.waitForCompletion(true)?0:1; //第二個Job以第一個Job的輸出做為輸入,只需要編寫Mapper類,在Mapper類中對一個job的輸出進行分析,並轉換為HBase需要的KeyValue的方式。 Job convertWordCountJobOutputToHFileJob=new Job(hadoopConfiguration, "wordCount_bulkload"); convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class); convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class); //ReducerClass 無需指定,框架會自行根據 MapOutputValueClass 來決定是使用 KeyValueSortReducer 還是 PutSortReducer //convertWordCountJobOutputToHFileJob.setReducerClass(KeyValueSortReducer.class); convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class); convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class); //以第一個Job的輸出做為第二個Job的輸入 FileInputFormat.addInputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[1])); FileOutputFormat.setOutputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[2])); //建立HBase的配置物件 Configuration hbaseConfiguration=HBaseConfiguration.create(); //建立目標表物件 HTable wordCountTable =new HTable(hbaseConfiguration, "word_count"); HFileOutputFormat.configureIncrementalLoad(convertWordCountJobOutputToHFileJob,wordCountTable); //提交第二個job int convertWordCountJobOutputToHFileJobResult=convertWordCountJobOutputToHFileJob.waitForCompletion(true)?0:1; //當第二個job結束之後,呼叫BulkLoad方式來將MR結果批量入庫 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConfiguration); //第一個引數為第二個Job的輸出目錄即儲存HFile的目錄,第二個引數為目標表 loader.doBulkLoad(new Path(dfsArgs[2]), wordCountTable); //最後呼叫System.exit進行退出 System.exit(convertWordCountJobOutputToHFileJobResult); } }
比如原始的輸入資料的目錄為:/rawdata/test/wordcount/20131212
中間結果資料儲存的目錄為:/middata/test/wordcount/20131212最終生成的HFile儲存的目錄為:/resultdata/test/wordcount/20131212
執行上面的Job的方式如下:
hadoop jar test.jar /rawdata/test/wordcount/20131212 /middata/test/wordcount/20131212 /resultdata/test/wordcount/20131212
3、說明與注意事項:
(1)HFile方式在所有的載入方案裡面是最快的,不過有個前提——資料是第一次匯入,表是空的。如果表中已經有了資料。HFile再匯入到hbase的表中會觸發split操作。
(2)最終輸出結果,無論是map還是reduce,輸出部分key和value的型別必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
否則報這樣的錯誤:
java.lang.IllegalArgumentException: Can't read partitions file
...
Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable
(3)最終輸出部分,Value型別是KeyValue 或Put,對應的Sorter分別是KeyValueSortReducer或PutSortReducer,這個 SorterReducer 可以不指定,因為原始碼中已經做了判斷:
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
(4) MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只適合一次對單列族組織成HFile檔案,多列簇需要起多個 job,不過新版本的 Hbase 已經解決了這個限制。
(5) MR例子中最後生成HFile儲存在HDFS上,輸出路徑下的子目錄是各個列族。如果對HFile進行入庫HBase,相當於move HFile到HBase的Region中,HFile子目錄的列族內容沒有了。
(6)最後一個 Reduce 沒有 setNumReduceTasks 是因為,該設定由框架根據region個數自動配置的。
(7)下邊配置部分,註釋掉的其實寫不寫都無所謂,因為看原始碼就知道configureIncrementalLoad方法已經把固定的配置全配置完了,不固定的部分才需要手動配置。
public class HFileOutput {
//job 配置
public static Job configureJob(Configuration conf) throws IOException {
Job job = new Job(configuration, "countUnite1");
job.setJarByClass(HFileOutput.class);
//job.setNumReduceTasks(2);
//job.setOutputKeyClass(ImmutableBytesWritable.class);
//job.setOutputValueClass(KeyValue.class);
//job.setOutputFormatClass(HFileOutputFormat.class);
Scan scan = new Scan();
scan.setCaching(10);
scan.addFamily(INPUT_FAMILY);
TableMapReduceUtil.initTableMapperJob(inputTable, scan,
HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);
//這裡如果不定義reducer部分,會自動識別定義成KeyValueSortReducer.class 和PutSortReducer.class
job.setReducerClass(HFileOutputRedcuer.class);
//job.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.configureIncrementalLoad(job, new HTable(
configuration, outputTable));
HFileOutputFormat.setOutputPath(job, new Path());
//FileOutputFormat.setOutputPath(job, new Path()); //等同上句
return job;
}
public static class HFileOutputMapper extends
TableMapper<ImmutableBytesWritable, LongWritable> {
public void map(ImmutableBytesWritable key, Result values,
Context context) throws IOException, InterruptedException {
//mapper邏輯部分
context.write(new ImmutableBytesWritable(Bytes()), LongWritable());
}
}
public static class HFileOutputRedcuer extends
Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
//reducer邏輯部分
KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),
Bytes.toBytes(count));
context.write(key, kv);
}
}
}
4、Refer:
1、Hbase幾種資料入庫(load)方式比較
2、MapReduce生成HFile入庫到HBase及原始碼分析
3、MapReduce生成HFile入庫到HBase