從hdfs中插入資料到hbase中
阿新 • • 發佈:2019-01-14
package mr.hdfstoHbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 從hdfs中插入資料到hbase中,批量匯入 * 從hdfs中map階段獲取相關資料,儲存為hfile格式 * 然後在hfile插入hbase */ public class HdfsToHbaseBulk { public static class HdfsToHbaseBulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private ImmutableBytesWritable mapKey = new ImmutableBytesWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); mapKey.set(Bytes.toBytes(split[0])); Put put = new Put(Bytes.toBytes(split[0])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(split[2])); context.write(mapKey, put); } } public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0"); //在除錯階段 可以在window下本地執行 //和hdfs 連線 Configuration conf = new Configuration(); //hdfs入口 conf.set("fs.defaultFS", "hdfs://wang:9000"); //和hbase連線 conf.set("zookeeper.znode.parent", "/hbase"); conf.set("hbase.zookeeper.quorum", "wang"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = Job.getInstance(conf); job.setJobName("HdfsToHbaseBulkJob"); job.setJarByClass(HdfsToHbaseBulk.class); //設定input hdfs路徑 Path inputPath = new Path("/user/wang/hbase_data/human.txt"); FileInputFormat.addInputPath(job, inputPath); //map 叢集中執行任務 遵循 移動計算 而不移動資料 //Mapper類 job.setMapperClass(HdfsToHbaseBulkMapper.class); //key類 job.setMapOutputKeyClass(ImmutableBytesWritable.class); //value類 job.setMapOutputValueClass(Put.class); //output Path outputPath = new Path("/user/wang/hbase_data/BlukOut5"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(Bytes.toBytes("hadoop:human"))); RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(Bytes.toBytes("hadoop:human"))); HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator); job.setOutputFormatClass(HFileOutputFormat2.class); //執行任務 FileOutputFormat.setOutputPath(job, outputPath); boolean flag = job.waitForCompletion(true); //mr任務正常執行後,會在對應目錄下生成hfile檔案 if(flag){ //hbase 匯入hfile格式檔案使用LoadIncrementalHFiles LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(conf); //解析hile檔案,將hfile 上傳到hbase loadIncrementalHFiles.doBulkLoad(outputPath,connection.getAdmin(),table,regionLocator); } } }