通過BulkLoad的方式快速匯入海量資料到Hbase
阿新 • • 發佈:2019-02-17
摘要
載入資料到HBase的方式有多種,通過HBase API匯入或命令列匯入或使用第三方(如sqoop)來匯入或使用MR來批量匯入(耗費磁碟I/O,容易在匯入的過程使節點宕機),但是這些方式不是慢就是在匯入的過程的佔用Region資料導致效率低下,今天要講的就是利用HBase在HDFS儲存原理及MapReduce的特性來快速匯入海量的資料HBase資料在HDFS下是如何儲存的?HBase中每張Table在根目錄(/HBase)下用一個資料夾儲存,Table名為資料夾名,在Table資料夾下每個Region同樣用一個資料夾儲存,每個Region資料夾下的每個列族也用資料夾儲存,而每個列族下儲存的就是一些HFile檔案,HFile就是HBase資料在HFDS下儲存格式,其整體目錄結構如下:/hbase/<tablename>/<encoded-regionname>/<column-family>/<filename>HBase資料寫路徑public class GenerateHFile extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] items = line.split("\t"); String ROWKEY = items[1] + items[2] + items[3]; ImmutableBytesWritable rowkey = new ImmutableBytesWritable(ROWKEY.getBytes()); Put put = new Put(ROWKEY.getBytes()); //ROWKEY put.addColumn("INFO".getBytes(), "URL".getBytes(), items[0].getBytes()); put.addColumn("INFO".getBytes(), "SP".getBytes(), items[1].getBytes()); //出發點 put.addColumn("INFO".getBytes(), "EP".getBytes(), items[2].getBytes()); //目的地 put.addColumn("INFO".getBytes(), "ST".getBytes(), items[3].getBytes()); //出發時間 put.addColumn("INFO".getBytes(), "PRICE".getBytes(), Bytes.toBytes(Integer.valueOf(items[4]))); //價格 put.addColumn("INFO".getBytes(), "TRAFFIC".getBytes(), items[5].getBytes());//交通方式 put.addColumn("INFO".getBytes(), "HOTEL".getBytes(), items[6].getBytes()); //酒店 context.write(rowkey, put); } }
GenerateHFileMain類
public class GenerateHFileMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
final String INPUT_PATH= "hdfs://master:9000/INFO/Input";
final String OUTPUT_PATH= "hdfs://master:9000/HFILE/Output";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("TRAVEL"));
Job job=Job.getInstance(conf);
job.getConfiguration().set("mapred.jar", "/home/hadoop/TravelProject/out/artifacts/Travel/Travel.jar"); //預先將程式打包再將jar分發到叢集上
job.setJarByClass(GenerateHFileMain.class);
job.setMapperClass(GenerateHFile.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("TRAVEL")))
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
System.exit(job.waitForCompletion(true)?0:1);
}