1. 程式人生 > >HBase之自定義HBase-Mapreduce案例一

HBase之自定義HBase-Mapreduce案例一

1.需求場景

將HBase中的ys表中的一部分資料通過Mapreduce遷移到ys_mr表中

2.程式碼編寫

  1)構建ReadysMapreduce類,用於讀取ys表中的資料

package cn.ysjh;

import java.io.IOException;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;



public class ReadysMapreduce extends TableMapper<ImmutableBytesWritable,Put>{
	
@Override
protected void map(ImmutableBytesWritable key, Result value,
		Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
		throws IOException, InterruptedException {
	//將 fruit 的 name 和 color 提取出來,相當於將每一行資料讀取出來放入到 Put 物件中。
    Put put = new Put(key.get());
//遍歷新增 column 行
    for(Cell cell: value.rawCells()){
//新增/克隆列族:info
        if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//新增/克隆列:name
            if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//將該列 cell 加入到 put 物件中
                put.add(cell);
//新增/克隆列:color
            }else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//向該列 cell 加入到 put 物件中
                put.add(cell);
            }
        }
    }
    //將從 fruit 讀取到的每行資料寫入到 context 中作為 map 的輸出
    context.write(key, put);

}
	
}

  2)構建WriteysReduce類,用於將讀取到的fruit表中的資料寫入到fruit_mr表中

package cn.ysjh;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class WriteysReduce extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{

	@Override
	protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
			Context context)
			throws IOException, InterruptedException {
		//讀出來的每一行資料寫入到 fruit_mr 表中
        for(Put put: values){
            context.write(NullWritable.get(), put);
	}
}
}

  3)構建JobysMapreduce類,用於建立Job任務

package cn.ysjh;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobysMapreduce extends Configured implements Tool{
	
	public int run(String[] args) throws Exception {
		//得到 Configuration
        Configuration conf = this.getConf();
//建立 Job 任務
        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(JobysMapreduce.class);
//配置 Job
        Scan scan = new Scan();
        scan.setCacheBlocks(false);
        scan.setCaching(500);
//設定 Mapper,注意匯入的是 mapreduce 包下的,不是 mapred 包下的,後者是老版本
        TableMapReduceUtil.initTableMapperJob(
                "ys", //資料來源的表名
                scan, //scan 掃描控制器
                ReadysMapreduce.class,//設定 Mapper 類
                ImmutableBytesWritable.class,//設定 Mapper 輸出 key 型別
                Put.class,//設定 Mapper 輸出 value 值型別
                job//設定給哪個 JOB
        );
//設定 Reducer
        TableMapReduceUtil.initTableReducerJob("ys_mr", WriteysReduce.class,
                job);
//設定 Reduce 數量,最少 1 個
        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        if(!isSuccess){
            throw new IOException("Job running with error");
        }
        return isSuccess ? 0 : 1;
    }
	
	public static void main( String[] args ) throws Exception{
        Configuration conf = HBaseConfiguration.create();
        int status = ToolRunner.run(conf, new JobysMapreduce(), args);
        System.exit(status);
    }
	
	}

3.打包執行

使用maven 打包命令:-P local clean package,然後將jar包上傳到叢集上執行測試

注意:   如果待匯入資料的表不存在,則需要提前建立