【HBase】HBase各功能元件、整合MapReduce的方式及資料遷移
阿新 • • 發佈:2019-01-02
1、HBase體系架構
各個功能元件闡述如下:
(1)Client
- 整個HBase叢集的訪問入口;
- 使用HBase RPC機制與HMaster和HRegionServer進行通訊;
- 與HMaster進行通訊進行管理類操作;
- 與HRegionServer進行資料讀寫類操作;
- 包含訪問HBase的介面,並維護cache來加快對HBase的訪問。
(2)Zookeeper
- 保證任何時候,叢集中只有一個HMaster;
- 存貯所有HRegion的定址入口;
- 實時監控HRegion Server的上線和下線資訊,並實時通知給HMaster;
- 儲存HBase的schema和table元資料;
- Zookeeper Quorum儲存Meta表地址、HMaster地址。
(3)HMaster
- HMaster沒有單點故障問題,HBase中可以啟動多個HMaster,通過Zookeeper的Master Election機制保證總有一個Master在執行,主要負責Table和Region的管理工作。
- 管理使用者對table的增刪改查操作;
- 管理HRegionServer的負載均衡,調整Region分佈;
- Region Split後,負責新Region的分佈;
- 在HRegionServer停機後,負責失效HRegionServer上Region遷移工作。
(4)HRegion Server
- 維護HRegion,處理對這些HRegion的IO請求,向HDFS檔案系統中讀寫資料;
- 負責切分在執行過程中變得過大的HRegion;
- Client訪問hbase上資料的過程並不需要master參與(定址訪問Zookeeper和HRegion Server,資料讀寫訪問HRegione Server),HMaster僅僅維護者table和Region的元資料資訊,負載很低。
(5)ZooKeeper
- HBase 依賴ZooKeeper;
- 預設情況下,HBase 管理 ZooKeeper 例項,比如, 啟動或者停止ZooKeeper;
- HMaster與HRegionServers 啟動時會向ZooKeeper註冊;
- Zookeeper的引入使得HMaster不再是單點故障。
2、HBase整合MapReduce
(1)新增環境變數
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar
(2)例:HBase整合MapReduce,將user表中的部分資料匯出到basic表中。
User2BasicMapReduce.java
package com.beifeng.senior.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class User2BasicMapReduce extends Configured implements Tool {
// Mapper Class
public static class ReadUserMapper extends TableMapper<Text, Put> {
private Text mapOutputKey = new Text();
@Override
public void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
throws IOException, InterruptedException {
// get rowkey
String rowkey = Bytes.toString(key.get());
// set
mapOutputKey.set(rowkey);
// --------------------------------------------------------
Put put = new Put(key.get());
// iterator
for (Cell cell : value.rawCells()) {
// add family : info
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
// add column: name
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
// add column : age
if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
}
}
// context write
context.write(mapOutputKey, put);
}
}
// Reducer Class
public static class WriteBasicReducer extends TableReducer<Text, Put, //
ImmutableBytesWritable> {
@Override
public void reduce(Text key, Iterable<Put> values,
Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
for(Put put: values){
context.write(null, put);
}
}
}
// Driver
public int run(String[] args) throws Exception {
// create job
Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
// set run job class
job.setJarByClass(this.getClass());
// set job
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
// set input and set mapper
TableMapReduceUtil.initTableMapperJob(
"user", // input table
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper class
Text.class, // mapper output key
Put.class, // mapper output value
job //
);
// set reducer and output
TableMapReduceUtil.initTableReducerJob(
"basic", // output table
WriteBasicReducer.class, // reducer class
job//
);
job.setNumReduceTasks(1); // at least one, adjust as required
// submit job
boolean isSuccess = job.waitForCompletion(true) ;
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
// get configuration
Configuration configuration = HBaseConfiguration.create();
// submit job
int status = ToolRunner.run(configuration,new User2BasicMapReduce(),args) ;
// exit program
System.exit(status);
}
}
3、將資料遷移進HBase
- 使用HBase Put API
- 使用HBase bulk load tool
- 使用自定義的MapReduce任務
(1)HBase Bulk Load工具
通常 MapReduce 在寫HBase時使用的是 TableOutputFormat 方式,在reduce中直接生成put物件寫入HBase,該方式在大資料量寫入時效率低下(HBase會block寫入,頻繁進行flush,split,compact等大量IO操作),並對HBase節點的穩定性造成一定的影響(GC時間過長,響應變慢,導致節點超時退出,並引起一系列連鎖反應)。
HBase支援 bulk load 的入庫方式,它是利用hbase的資料資訊按照特定格式儲存在hdfs內這一原理,直接在HDFS中生成持久化的HFile資料格式檔案,然後上傳至合適位置,即完成巨量資料快速入庫的辦法。配合mapreduce完成,高效便捷,而且不佔用region資源,增添負載,在大資料量寫入時能極大的提高寫入效率,並降低對HBase節點的寫入壓力。
通過使用先生成HFile,然後再BulkLoad到Hbase的方式來替代之前直接呼叫HTableOutputFormat的方法有如下的好處:
- 消除了對HBase叢集的插入壓力;
- 提高了Job的執行速度,降低了Job的執行時間。
Bulk Load的工作流程:
- mapreduce將*.cvs檔案轉換為hfile檔案;
- bulk loada將hfile檔案載入進HBase表中。
執行命令如下:
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
student \
hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/hbase/importtsv
4、HBase表的增刪改查操作示例
HBaseOperation.java
package com.beifeng.senior.hadoop.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
/**
* CRUD Operations
*
*/
public class HBaseOperation {
public static HTable getHTableByTableName(String tableName) throws Exception {
// Get instance of Default Configuration
Configuration configuration = HBaseConfiguration.create();
// Get table instance
HTable table = new HTable(configuration, tableName);
return table;
}
public void getData() throws Exception {
String tableName = "user"; // default.user / hbase:meta
HTable table = getHTableByTableName(tableName);
// Create Get with rowkey
Get get = new Get(Bytes.toBytes("10002")); // "10002".toBytes()
// ==========================================================================
// add column
get.addColumn(//
Bytes.toBytes("info"), //
Bytes.toBytes("name"));
get.addColumn(//
Bytes.toBytes("info"), //
Bytes.toBytes("age"));
// Get Data
Result result = table.get(get);
// Key : rowkey + cf + c + version
// Value: value
for (Cell cell : result.rawCells()) {
System.out.println(//
Bytes.toString(CellUtil.cloneFamily(cell)) + ":" //
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " ->" //
+ Bytes.toString(CellUtil.cloneValue(cell)));
}
// Table Close
table.close();
}
/**
* 建議 tablename & column family -> 常量 , HBaseTableContent
* Map<String,Obejct>
* @throws Exception
*/
public void putData() throws Exception {
String tableName = "user"; // default.user / hbase:meta
HTable table = getHTableByTableName(tableName);
Put put = new Put(Bytes.toBytes("10004"));
// Add a column with value
put.add(//
Bytes.toBytes("info"), //
Bytes.toBytes("name"), //
Bytes.toBytes("zhaoliu")//
);
put.add(//
Bytes.toBytes("info"), //
Bytes.toBytes("age"), //
Bytes.toBytes(25)//
);
put.add(//
Bytes.toBytes("info"), //
Bytes.toBytes("address"), //
Bytes.toBytes("shanghai")//
);
table.put(put);
table.close();
}
public void delete() throws Exception {
String tableName = "user"; // default.user / hbase:meta
HTable table = getHTableByTableName(tableName);
Delete delete = new Delete(Bytes.toBytes("10004"));
/*
* delete.deleteColumn(Bytes.toBytes("info"),//
* Bytes.toBytes("address"));
*/
delete.deleteFamily(Bytes.toBytes("info"));
table.delete(delete);
table.close();
}
public static void main(String[] args) throws Exception {
String tableName = "user"; // default.user / hbase:meta
HTable table = null;
ResultScanner resultScanner = null;
try {
table = getHTableByTableName(tableName);
Scan scan = new Scan();
// Range
scan.setStartRow(Bytes.toBytes("10001"));
scan.setStopRow(Bytes.toBytes("10003")) ;
// Scan scan2 = new Scan(Bytes.toBytes("10001"),Bytes.toBytes("10003"));
// PrefixFilter
// PageFilter
// scan.setFilter(filter) ;
// scan.setCacheBlocks(cacheBlocks);
// scan.setCaching(caching);
// scan.addColumn(family, qualifier)
// scan.addFamily(family)
resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
System.out.println(Bytes.toString(result.getRow()));
// System.out.println(result);
for (Cell cell : result.rawCells()) {
System.out.println(//
Bytes.toString(CellUtil.cloneFamily(cell)) + ":" //
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " ->" //
+ Bytes.toString(CellUtil.cloneValue(cell)));
}
System.out.println("---------------------------------------");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(resultScanner);
IOUtils.closeStream(table);
}
}
}