MR操作hbase的一點心得(含hbase表拷貝樣例程式碼)
阿新 • • 發佈:2019-01-01
最近在寫基於hbase的MR程式。總結如下:
1、使用TableMapper來讀取表
2、寫入表的第一種方式是用TableMapReduceUtil.initTableReducerJob的方法,這裡既可以在map階段輸出,也能在reduce階段輸出。區別是Reduce的class設定為null或者實際的reduce 以下是一個表copy的例子:
3、寫入表的方式還有一種,就是呼叫hbase的原生api,即HTable.put的方式寫入資料(這種方式適合寫少量資料,或者統計後的結果)package com.run.test; import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; public class TableCopy extends Configured implements Tool{ static class CopyMapper extends TableMapper<ImmutableBytesWritable,Put>{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //將查詢結果儲存到list List<KeyValue> kvs = value.list(); Put p = new Put(); //將結果裝載到Put for(KeyValue kv : kvs) p.add(kv); //將結果寫入到Reduce context.write(key, p); } } public static Job createSubmittableJob(Configuration conf, String[] args)throws IOException{ String jobName = args[0]; String srcTable = args[1]; String dstTable = args[2]; Scan sc = new Scan(); sc.setCaching(10000); sc.setCacheBlocks(false); Job job = new Job(conf,jobName); job.setJarByClass(TableCopy.class); job.setNumReduceTasks(0); TableMapReduceUtil.initTableMapperJob(srcTable, sc, CopyMapper.class, ImmutableBytesWritable.class, Result.class, job); TableMapReduceUtil.initTableReducerJob(dstTable, null, job); return job; } @Override public int run(String[] args)throws Exception{ Job job = createSubmittableJob(getConf(), args); return job.waitForCompletion(true)? 0 : 1; } }