1. 程式人生 > >通過BulkLoad的方式快速匯入海量資料到Hbase

通過BulkLoad的方式快速匯入海量資料到Hbase

摘要

載入資料到HBase的方式有多種,通過HBase API匯入或命令列匯入或使用第三方(如sqoop)來匯入或使用MR來批量匯入(耗費磁碟I/O,容易在匯入的過程使節點宕機),但是這些方式不是慢就是在匯入的過程的佔用Region資料導致效率低下,今天要講的就是利用HBase在HDFS儲存原理及MapReduce的特性來快速匯入海量的資料HBase資料在HDFS下是如何儲存的?HBase中每張Table在根目錄(/HBase)下用一個資料夾儲存,Table名為資料夾名,在Table資料夾下每個Region同樣用一個資料夾儲存,每個Region資料夾下的每個列族也用資料夾儲存,而每個列族下儲存的就是一些HFile檔案,HFile就是HBase資料在HFDS下儲存格式,其整體目錄結構如下:/hbase/<tablename>/<encoded-regionname>/<column-family>/<filename>HBase資料寫路徑
                                                                              (圖來自Cloudera)在put資料時會先將資料的更新操作資訊和資料資訊寫入WAL,在寫入到WAL後,資料就會被放到MemStore中,當MemStore滿後資料就會被flush到磁碟(即形成HFile檔案),在這過程涉及到的flush,split,compaction等操作都容易造成節點不穩定,資料匯入慢,耗費資源等問題,在海量資料的匯入過程極大的消耗了系統性能,避免這些問題最好的方法就是使用BlukLoad的方式來載入資料到HBase中。
原理利用HBase資料按照HFile格式儲存在HDFS的原理,使用Mapreduce直接生成HFile格式檔案後,RegionServers再將HFile檔案移動到相應的Region目錄下其流程如下圖:                                                                      (圖來自Cloudera)匯入過程1.使用MapReduce生成HFile檔案GenerateHFile類
public class GenerateHFile extends Mapper<LongWritable,
        Text, ImmutableBytesWritable, Put>{
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] items = line.split("\t");
 
            String ROWKEY = items[1] + items[2] + items[3];
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(ROWKEY.getBytes());
            Put put = new Put(ROWKEY.getBytes());   //ROWKEY
            put.addColumn("INFO".getBytes(), "URL".getBytes(), items[0].getBytes());
            put.addColumn("INFO".getBytes(), "SP".getBytes(), items[1].getBytes());  //出發點
            put.addColumn("INFO".getBytes(), "EP".getBytes(), items[2].getBytes());  //目的地
            put.addColumn("INFO".getBytes(), "ST".getBytes(), items[3].getBytes());   //出發時間
            put.addColumn("INFO".getBytes(), "PRICE".getBytes(), Bytes.toBytes(Integer.valueOf(items[4])));  //價格
            put.addColumn("INFO".getBytes(), "TRAFFIC".getBytes(), items[5].getBytes());//交通方式
            put.addColumn("INFO".getBytes(), "HOTEL".getBytes(), items[6].getBytes()); //酒店
           
            context.write(rowkey, put);
        }
}

GenerateHFileMain類

public class GenerateHFileMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        final String INPUT_PATH= "hdfs://master:9000/INFO/Input";
        final String OUTPUT_PATH= "hdfs://master:9000/HFILE/Output";
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("TRAVEL"));
        Job job=Job.getInstance(conf);
        job.getConfiguration().set("mapred.jar", "/home/hadoop/TravelProject/out/artifacts/Travel/Travel.jar");  //預先將程式打包再將jar分發到叢集上
        job.setJarByClass(GenerateHFileMain.class);
        job.setMapperClass(GenerateHFile.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("TRAVEL")))
        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
        System.exit(job.waitForCompletion(true)?0:1);
    }