HBase的幾種匯入資料的方式
1、傳統的主要使用Hbase的shell進行手動的輸入,都需要經過Hbase的介面,過程
2、使用MapReduce進行批量的匯入,但是還是會經過Hbase的HMaster,HregionerServer一些列的過程,增加系統的資源的消耗。例如:
import java.text.SimpleDateFormat;
public class BatchImport {
//資料的形式類似於 0 20101223122329大叫好
static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
Text v2 = new Text();
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
final String[] splited = value.toString().split("\t");
try {
final Date date = new Date(Long.parseLong(splited[0].trim()));
final String dateFormat = dateformat1.format(date);
v2.set(splited[1]+":"+dateFormat+"\t"+value.toString());
context.write(key, v2);
} catch (NumberFormatException e) {
final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
counter.increment(1L);
System.out.println("出錯了"+splited[0]+" "+e.getMessage());
}
};
}
static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
protected void reduce(LongWritable key, java.lang.Iterable<Text> values,Context context) throws java.io.IOException ,InterruptedException {
for (Text text : values) {
final String[] splited = text.toString().split("\t");
final Put put = new Put(Bytes.toBytes(splited[0]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
//省略其他欄位,呼叫put.add(....)即可
context.write(NullWritable.get(), put);
}
};
}
public static void main(String[] args) throws Exception {
final Configuration configuration = new Configuration();
//設定zookeeper
configuration.set("hbase.zookeeper.quorum", "master");
//設定hbase表名稱
configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
//將該值改大,防止hbase超時退出
configuration.set("dfs.socket.timeout", "180000");
final Job job = new Job(configuration, "HBaseBatchImport");
job.setMapperClass(BatchImportMapper.class);
job.setReducerClass(BatchImportReducer.class);
//設定map的輸出,不設定reduce的輸出型別
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
//不再設定輸出路徑,而是設定輸出格式型別
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.setInputPaths(job, "hdfs://222.27.174.66:9000/input2");
job.waitForCompletion(true);
}
}
3、不經過Hbase的過程,直接在HDFS中生成HFile,在將HFile更新到相應的HReginServer中
可以使用命令的方式,將hdfs檔案轉化為hfile
首先建立一個表 create 'datatsv' ,'d'
建立一個檔案inputfile
row1 c1 c2
row2 c1 c2
row3 c1 c2
row4 c1 c2
row5 c1 c2
row6 c1 c2
row7 c1 c2
row8 c1 c2
row9 c1 c2
hadoop fs -put inputfile /user/input/inputfile //上傳到hdfs的目錄下
hadoop jar hbase-0.94.7.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,d:c1,d:c2 -Dimporttsv.bulk.output=/user/output/outputfile datatsv /user/input/inputfile
在/user/output/outputfile目錄下生成HFile檔案
將HFile匯入到HBAse中
bin/hbaseorg.apache.hadoop.hbase.mapreduce.LoadIncreamentalHFiles /user/output/outputfile datatsv
一、這種方式有很多的優點:
1. 如果我們一次性入庫hbase巨量資料,處理速度慢不說,還特別佔用Region資源, 一個比較高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat類。
2. 它是利用hbase的資料資訊按照特定格式儲存在hdfs內這一原理,直接生成這種hdfs記憶體儲的資料格式檔案,然後上傳至合適位置,即完成巨量資料快速入庫的辦法。配合mapreduce完成,高效便捷,而且不佔用region資源,增添負載。
二、這種方式也有很大的限制:
1. 僅適合初次資料匯入,即表內資料為空,或者每次入庫表內都無資料的情況。
三、接下來一個demo,簡單介紹整個過程。
1. 生成HFile部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
package zl.hbase.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
|