HBase之自定義HBase-Mapreduce案例一
阿新 • • 發佈:2018-11-14
1.需求場景
將HBase中的ys表中的一部分資料通過Mapreduce遷移到ys_mr表中
2.程式碼編寫
1)構建ReadysMapreduce類,用於讀取ys表中的資料
package cn.ysjh; import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Mapper; public class ReadysMapreduce extends TableMapper<ImmutableBytesWritable,Put>{ @Override protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException { //將 fruit 的 name 和 color 提取出來,相當於將每一行資料讀取出來放入到 Put 物件中。 Put put = new Put(key.get()); //遍歷新增 column 行 for(Cell cell: value.rawCells()){ //新增/克隆列族:info if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ //新增/克隆列:name if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //將該列 cell 加入到 put 物件中 put.add(cell); //新增/克隆列:color }else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //向該列 cell 加入到 put 物件中 put.add(cell); } } } //將從 fruit 讀取到的每行資料寫入到 context 中作為 map 的輸出 context.write(key, put); } }
2)構建WriteysReduce類,用於將讀取到的fruit表中的資料寫入到fruit_mr表中
package cn.ysjh; import java.io.IOException; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class WriteysReduce extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{ @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //讀出來的每一行資料寫入到 fruit_mr 表中 for(Put put: values){ context.write(NullWritable.get(), put); } } }
3)構建JobysMapreduce類,用於建立Job任務
package cn.ysjh; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobysMapreduce extends Configured implements Tool{ public int run(String[] args) throws Exception { //得到 Configuration Configuration conf = this.getConf(); //建立 Job 任務 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(JobysMapreduce.class); //配置 Job Scan scan = new Scan(); scan.setCacheBlocks(false); scan.setCaching(500); //設定 Mapper,注意匯入的是 mapreduce 包下的,不是 mapred 包下的,後者是老版本 TableMapReduceUtil.initTableMapperJob( "ys", //資料來源的表名 scan, //scan 掃描控制器 ReadysMapreduce.class,//設定 Mapper 類 ImmutableBytesWritable.class,//設定 Mapper 輸出 key 型別 Put.class,//設定 Mapper 輸出 value 值型別 job//設定給哪個 JOB ); //設定 Reducer TableMapReduceUtil.initTableReducerJob("ys_mr", WriteysReduce.class, job); //設定 Reduce 數量,最少 1 個 job.setNumReduceTasks(1); boolean isSuccess = job.waitForCompletion(true); if(!isSuccess){ throw new IOException("Job running with error"); } return isSuccess ? 0 : 1; } public static void main( String[] args ) throws Exception{ Configuration conf = HBaseConfiguration.create(); int status = ToolRunner.run(conf, new JobysMapreduce(), args); System.exit(status); } }
3.打包執行
使用maven 打包命令:-P local clean package,然後將jar包上傳到叢集上執行測試
注意: 如果待匯入資料的表不存在,則需要提前建立