MapReduce 操作 HBase
1.HBase與MR關係
HBase和MapReduce,這兩者並沒有直接關係,隸屬於不同的專案。這裡講到的MapReduce on HBase是指利用HBase表做為MR計算框架的資料輸入源或者輸出源源,使得能夠利用MR的平行計算能力計算HBase的內部資料。
2.官方HBase-MapReduce
(1)檢視HBase的MapReduce任務的執行
$ bin/hbase mapredcp
(2)執行環境變數的匯入
$ export HBASE_HOME=/opt/module/hbase-1.3.1
$ export HADOOP_HOME=/opt/module//hadoop-2.7.2
$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
(3)執行官方的MapReduce任務
-- 案例一:統計Student表中有多少行資料
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
(4)案例二:使用MapReduce將本地資料匯入到HBase
1)在本地建立一個tsv格式的檔案:fruit.tsv
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
2)建立HBase表
hbase(main):001:0> create 'fruit','info'
3)在HDFS中建立input_fruit資料夾並上傳fruit.tsv檔案
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
4)執行MapReduce到HBase的fruit表中
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
5)使用scan命令檢視匯入後的結果
3.自定義HBase-MapReduce1
目標:將fruit表中的一部分資料,通過MR遷入到fruit_mr表中。
分步實現:
(0) 新建專案後在pom.xml中新增依賴:
<?xml version="1.0" encoding="UTF-8"?>
<modelVersion>4.0.0</modelVersion>
<groupId>com.luomk</groupId>
<artifactId>HBaseMapReduce1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
</project>
(1)構建HbaseMapper類,用於讀取fruit表中的資料
package com.luomk;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HbaseMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//獲取put物件
Put v = new Put(key.copyBytes());
for (Cell cell : value.rawCells()) {
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
v.add(cell);
}
}
context.write(key, v);
}
}
(2)構建HbaseReducer類,用於將讀取到的fruit表中的資料寫入到fruit_mr表中
package com.luomk;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import java.io.IOException;
public class HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put value : values) {
context.write(key,value);
}
}
}
(3)呼叫執行
package com.luomk;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
public class HbaseDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.獲取Hbase的conf&封裝job
Configuration configuration = new Configuration();
Configuration conf = HBaseConfiguration.create(configuration);
Job job = Job.getInstance(conf);
//2.設定主類
job.setJarByClass(HbaseDriver.class);
Scan scan = new Scan();
//3.設定Mapper類
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(args[0]), scan, HbaseMapper.class, ImmutableBytesWritable.class, Put.class, job);
//4.設定reducer個數
// job.setNumReduceTasks(1);
//5.設定Reducer
TableMapReduceUtil.initTableReducerJob(args[1], HbaseReducer.class, job);
//提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(4)打包執行任務
$ ~/modules/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar com.luomk.HbaseDriver
提示:執行任務前,如果待資料匯入的表不存在,則需要提前建立。
提示:maven打包命令:-P local clean package或-P dev clean package install(將第三方jar包一同打包,需要外掛:maven-shade-plugin)
4.自定義HBase-MapReduce2
目標:實現將HDFS中的資料寫入到HBase表中。
分步實現:
(0) 新建專案後在pom.xml中新增依賴:
<?xml version="1.0" encoding="UTF-8"?>
<modelVersion>4.0.0</modelVersion>
<groupId>com.luomk</groupId>
<artifactId>HBaseMapReducer2</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
</project>
(1)構建HbaseMapper於讀取HDFS中的檔案資料
package com.luomk;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* :實現將HDFS中的資料寫入到HBase表中。
*/
public class HbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private ImmutableBytesWritable k = new ImmutableBytesWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
k.set(Bytes.toBytes(split[0]));
Put put = new Put(Bytes.toBytes(split[0]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
context.write(k, put);
}
}
(2)構建HbaseReducer類
package com.luomk;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import java.io.IOException;
public class HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put value : values) {
context.write(key, value);
}
}
}
(3)呼叫執行
package com.luomk;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class HbaseDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Configuration conf = HBaseConfiguration.create(configuration);
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseDriver.class);
job.setMapperClass(HbaseMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
TableMapReduceUtil.initTableReducerJob(args[1], HbaseReducer.class, job);
FileInputFormat.setInputPaths(job, new Path(args[0]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(4)打包執行
$ ~/modules/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar com.luomk.HbaseDriver
提示:執行任務前,如果待資料匯入的表不存在,則需要提前建立之。
提示:maven打包命令:-P local clean package或-P dev clean package install(將第三方jar包一同打包,需要外掛:maven-shade-plugin)
5.原始碼下載地址: