將檔案匯入到HBase資料表中
阿新 • • 發佈:2018-12-19
楔子
學習瞭解 HBase 從HDFS讀取資料,匯入到HBase,
1.1 構建Mapper讀取HDFS資料
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io. Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @Title: ReadDataFromHDFSMapper.java
* @Package cn.zhuzi.hbase.mr_hdfs
* @Description: TODO(讀取文字資料到HBASE)
* @author 作者
* @version 建立時間:2018年11月5日 下午7:02:17
*
*/
public class ReadDataFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException {
String line = value.toString();
// 倒數資料的同時清洗資料
String[] values = line.split("\t");
String row = values[ 0];
String name = values[1];
String infos = values[2];
//初始化rowKey
ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(row.getBytes());
//初始化put
Put put = new Put(Bytes.toBytes(row));
//引數分別是 列族、列、值
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(infos));
context.write(immutableBytesWritable, put);
}
}
1.2 構建Reducer類
import java.io.IOException;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @Title: WriteHBaseReducer.java
* @Package cn.zhuzi.hbase.mr_hdfs
* @Description: TODO(用一句話描述該檔案做什麼)
* @author 作者
* @version 建立時間:2018年11月5日 下午7:11:47
*
*/
public class WriteHBaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable imm, Iterable<Put> puts, Reducer<ImmutableBytesWritable, Put, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
// 讀出來的每一行寫入到 hbase表
for (Put put : puts) {
context.write(NullWritable.get(), put);
}
}
}
1.3 組裝JOB並執行
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @Title: HDFSDriver.java
* @Package cn.zhuzi.hbase.mr_hdfs
* @Description: TODO(用一句話描述該檔案做什麼)
* @author 作者
* @version 建立時間:2018年11月5日 下午7:17:42
*
*/
public class HDFSDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
// 組裝 JOB
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(HDFSDriver.class);
// 輸入路徑
Path inPath = new Path("hdfs://hadoop:9000//input/fruit.tsv");
// 新增輸入路徑
FileInputFormat.addInputPath(job, inPath);
// 設定mapper
job.setMapperClass(ReadDataFromHDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// 設定 Reduce
TableMapReduceUtil.initTableReducerJob("fruits_hdfs", WriteHBaseReducer.class, job);
// 設定Reduce數量,最小是1個
job.setNumReduceTasks(1);
boolean completion = job.waitForCompletion(true);
if (!completion) {
throw new IOException(" JOB 執行錯誤");
}
return completion ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop");// 單機
// zookeeper地址
conf.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper埠
int run = ToolRunner.run(conf, new HDFSDriver(), args);
System.exit(run);
}
}