Hbase通過BulkLoad的方式快速匯入海量資料
阿新 • • 發佈:2019-01-23
摘要
載入資料到HBase的方式有多種,通過HBase API匯入或命令列匯入或使用第三方(如sqoop)來匯入或使用MR來批量匯入(耗費磁碟I/O,容易在匯入的過程使節點宕機),但是這些方式不是慢就是在匯入的過程的佔用Region資料導致效率低下,今天要講的就是利用HBase在HDFS儲存原理及MapReduce的特性來快速匯入海量的資料 HBase資料在HDFS下是如何儲存的? HBase中每張Table在根目錄(/HBase)下用一個資料夾儲存,Table名為資料夾名,在Table資料夾下每個Region同樣用一個資料夾儲存,每個Region資料夾下的每個列族也用資料夾儲存,而每個列族下儲存的就是一些HFile檔案,HFile就是HBase資料在HFDS下儲存格式,其整體目錄結構如下: /hbase/<tablename>/<encoded-regionname>/<column-family>/<filename> HBase資料寫路徑1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public class GenerateHFile extends Mapper<LongWritable,
Text,
ImmutableBytesWritable, Put>{
protected void map(LongWritable
key, Text value, Context context) throws IOException,
InterruptedException {
String
line = value.toString();
String[]
items = line.split( "\t" );
String
ROWKEY = items[ 1 ]
+ items[ 2 ]
+ items[ 3 ];
ImmutableBytesWritable
rowkey = new ImmutableBytesWritable(ROWKEY.getBytes());
Put
put = new Put(ROWKEY.getBytes()); //ROWKEY
put.addColumn( "INFO" .getBytes(), "URL" .getBytes(),
items[ 0 ].getBytes());
put.addColumn( "INFO" .getBytes(), "SP" .getBytes(),
items[ 1 ].getBytes()); //出發點
put.addColumn( "INFO" .getBytes(), "EP" .getBytes(),
items[ 2 ].getBytes()); //目的地
put.addColumn( "INFO" .getBytes(), "ST" .getBytes(),
items[ 3 ].getBytes()); //出發時間
put.addColumn( "INFO" .getBytes(), "PRICE" .getBytes(),
Bytes.toBytes(Integer.valueOf(items[ 4 ]))); //價格
put.addColumn( "INFO" .getBytes(), "TRAFFIC" .getBytes(),
items[ 5 ].getBytes()); //交通方式
put.addColumn( "INFO" .getBytes(), "HOTEL" .getBytes(),
items[ 6 ].getBytes()); //酒店
context.write(rowkey,
put);
}
}
|
public class GenerateHFileMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { final String INPUT_PATH= "hdfs://master:9000/INFO/Input"; final String OUTPUT_PATH= "hdfs://master:9000/HFILE/Output"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("TRAVEL")); Job job=Job.getInstance(conf); job.getConfiguration().set("mapred.jar", "/home/hadoop/TravelProject/out/artifacts/Travel/Travel.jar"); //預先將程式打包再將jar分發到叢集上 job.setJarByClass(GenerateHFileMain.class); job.setMapperClass(GenerateHFile.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setOutputFormatClass(HFileOutputFormat2.class); HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("TRAVEL"))) FileInputFormat.addInputPath(job,new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true)?0:1); }注意 1.Mapper的輸出Key型別必須是包含Rowkey的ImmutableBytesWritable格式,Value型別必須為KeyValue或Put型別,當匯入的資料有多列時使用Put,只有一個列時使用KeyValue 2.job.setMapOutPutValueClass的值決定了job.setReduceClass的值,這裡Reduce主要起到了對資料進行排序的作用,當job.setMapOutPutValueClass的值Put.class和KeyValue.class分別對應job.setReduceClass的PutSortReducer和KeyValueSortReducer 3.在建立表時對錶進行預分割槽再結合MapReduce的平行計算機制能加快HFile檔案的生成,如果對錶進行了預分割槽(Region)就設定Reduce數等於分割槽數(Region) 4.在多列族的情況下需要進行多次的context.write 2.通過BlukLoad方式載入HFile檔案
public class LoadIncrementalHFileToHBase { public static void main(String[] args) throws Exception { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); Table table = connection.getTable(TableName.valueOf("TRAVEL")); LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); load.doBulkLoad(new Path("hdfs://master:9000/HFILE/OutPut"), admin,table,connection.getRegionLocator(TableName.valueOf("TRAVEL"))); } }由於BulkLoad是繞過了Write to WAL,Write to MemStore及Flush to disk的過程,所以並不能通過WAL來進行一些複製資料的操作 優點: 1.匯入過程不佔用Region資源 2.能快速匯入海量的資料 3.節省記憶體