使用mapreduce複製hdfs檔案到hbase表
阿新 • • 發佈:2018-12-16
準備工作如下:
一.一張hbase表——fruit(無資料),一個hdfs本地檔案——fruit.tsv(有資料),我們將使用mapreduce將該本地檔案內容寫入到hbase的fruit表中。
二.建立包如下
程式碼如下:
map端:
public class HdfsMapper extends Mapper<LongWritable, Text, NullWritable, Put> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Put>.Context context) throws IOException, InterruptedException { String wordString[] = value.toString().split("\t"); Put put = new Put(Bytes.toBytes(wordString[0])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(wordString[1])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(wordString[2])); context.write(NullWritable.get(), put); } }
reduce端:
public class HdfsReducer extends TableReducer<NullWritable, Put, NullWritable> { @Override protected void reduce(NullWritable arg0, Iterable<Put> arg1, Reducer<NullWritable, Put, NullWritable, Mutation>.Context arg2) throws IOException, InterruptedException { for (Put put : arg1) { arg2.write( NullWritable.get(),put); } } }
drive端:
public class HdfsDriver extends Configuration implements Tool { Configuration conf = null; @Override public void setConf(Configuration conf) { this.conf = conf; } @Override public Configuration getConf() { // TODO Auto-generated method stub return conf; } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(conf); job.setJarByClass(HdfsDriver.class); job.setMapperClass(HdfsMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Put.class); TableMapReduceUtil .initTableReducerJob("fruit", HdfsReducer.class, job); FileInputFormat.setInputPaths(job, "/root/fruit.tsv"); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int tool = ToolRunner.run(conf, new HdfsDriver(), args); System.exit(tool); } }
執行結果如下: