5.HBase_應用_MapReduce
阿新 • • 發佈:2018-11-06
使用MapReduce操作HBase表資料,比如實現HBase資料遷移,從一個表抽取資料匯入另一個表。
1.首先,我們新建maven專案,並匯入hbase相應的依賴包
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.lv</groupId> <artifactId>hbase-study</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.1.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.1.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> </dependencies> <build> <finalName>hbase-study</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive><manifest> <mainClass>cn.lv.mr.Emp2BasicMapReduce</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
2.編寫程式碼,從hbase中emp表抽取資料匯入basic表中
package cn.lv.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; /** * HBase表導資料:from emp table to basic table * * @author lw * */ public class Emp2BasicMapReduce { // Mapper // emp,info,{name='zhangsan'} // emp,info,{age='22'} public static class ReadEmp2BasicMapper extends TableMapper<Text, Put> { public Text outputKey = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // row key String rowKey = Bytes.toString(key.get()); outputKey.set(rowKey); // outputValue Put put = new Put(key.get()); // Iterator for (Cell cell : value.rawCells()) { // add family : info if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) { // add column : name if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { put.add(cell); } // add column : age if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { put.add(cell); } } } context.write(outputKey, put); } } // Reducer public static class WriteBasicReducer extends TableReducer<Text, Put, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<Put> value, Context context) throws IOException, InterruptedException { for (Put put : value) { context.write(null, put); } } } public static void main(String[] args) throws Exception { // configuration Configuration conf = HBaseConfiguration.create(); // create job Job job = Job.getInstance(conf, Emp2BasicMapReduce.class.getSimpleName()); // set run job jar class job.setJarByClass(Emp2BasicMapReduce.class); // create hbase scan Scan scan = new Scan(); scan.setCacheBlocks(false); scan.setCaching(500); // 預設1 // set other scan attrs TableMapReduceUtil.initTableMapperJob("emp", // input table scan, // scan instance to control cf and attribute selection ReadEmp2BasicMapper.class, // mapper class Text.class, // mapper output key Put.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob("basic", // output table WriteBasicReducer.class, // reducer class job); job.setNumReduceTasks(1); // reduce task num System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3.執行jar包
注意:在執行的過程中,一定要注意把HBase的包指定到HADOOP_CLASSPATH環境變數中。即,在hadoop_env.sh中新增如下:
export HBASE_HOME=/usr/hdp/2.5.3.0-37/hbase
export HADOOP_HOME=/usr/hdp/2.5.3.0-37/hadoop
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath`
否則,就會報錯:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
加入上面的環境變數,我們執行jar包,如下:
# 執行命令
yarn jar hbase-study-jar-with-dependencies.jar
執行結果驗證(我們只遷移了name和age):
hbase(main):005:0> scan 'emp'
ROW COLUMN+CELL
10001 column=info:address, timestamp=1540373935927, value=hebei sjz yuhua
10001 column=info:age, timestamp=1540373917799, value=22
10001 column=info:name, timestamp=1540373904273, value=zhangsan
10002 column=info:address, timestamp=1540373978440, value=henan zhengzhou erqi
10002 column=info:age, timestamp=1540373956551, value=24
10002 column=info:name, timestamp=1540373947414, value=lisi
2 row(s) in 0.0270 seconds
hbase(main):006:0> scan 'basic'
ROW COLUMN+CELL
10001 column=info:age, timestamp=1540373917799, value=22
10001 column=info:name, timestamp=1540373904273, value=zhangsan
10002 column=info:age, timestamp=1540373956551, value=24
10002 column=info:name, timestamp=1540373947414, value=lisi
2 row(s) in 0.0290 seconds