重溫大資料---Hbase架構進階
這一講主要是對Hbase JavaApi使用的介紹,程式設計還是挺簡單的,重點在於理解程式設計實現的過程。其次深入講解了Hbase的架構。以及Hbase如何實現資料的遷移。
Hbase Java API
Hbase提供了java開發的介面,可以使用java語言對Hbase資料庫進行操作。
- jar包依賴 server client
- 配置檔案匯入 hdfs-seite.xml core-site.xml hbase-site.xml
程式碼內容很簡單,我也打上了註釋就不多說了。
例項程式碼:
package com.beifeng.senior.hadoop.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client. Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache. hadoop.io.IOUtils;
/**
* CRUD Operations
*
* @author xianglei
*
*/
public class HBaseOperation {
public static HTable getHTableByTableName(String tableName) throws Exception {
// 讀取預設配置檔案資訊
Configuration configuration = HBaseConfiguration.create();
//獲取表的例項
HTable table = new HTable(configuration, tableName);
return table;
}
public void getData() throws Exception {
String tableName = "user"; // 預設省略->命名框架+名稱:default.user / hbase:meta
HTable table = getHTableByTableName(tableName);
// 獲取一個有著Rowkey的Get 對應shell -->get 'user','10002','info:name'
Get get = new Get(Bytes.toBytes("10002"));
// get 查詢 指定( 列蔟 列名)
get.addColumn(//
Bytes.toBytes("info"), //
Bytes.toBytes("name"));
get.addColumn(//
Bytes.toBytes("info"), //
Bytes.toBytes("age"));
// 獲取資料
Result result = table.get(get);
// Key : rowkey + cf + c + version
// Value: value
// 列印 列蔟:列名->值
for (Cell cell : result.rawCells()) {
System.out.println(//
Bytes.toString(CellUtil.cloneFamily(cell)) + ":" //
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " ->" //
+ Bytes.toString(CellUtil.cloneValue(cell)));
}
// 關閉資源
table.close();
}
/**
* 建議 把tablename 和 columnfamily 定義為 常量 , 寫到一個常量工具類裡面 HBaseTableContent
*
* 對於插入的列名和值建議使用 Map<String,Obejct> 使用for進行add
*
* @throws Exception
*/
public void putData() throws Exception {
String tableName = "user";
HTable table = getHTableByTableName(tableName);
Put put = new Put(Bytes.toBytes("10004"));
// put 'user','10004','info:name','zhaoliu'
put.add(//
Bytes.toBytes("info"), //
Bytes.toBytes("name"), //
Bytes.toBytes("zhaoliu")//
);
put.add(//
Bytes.toBytes("info"), //
Bytes.toBytes("age"), //
Bytes.toBytes(25)//
);
put.add(//
Bytes.toBytes("info"), //
Bytes.toBytes("address"), //
Bytes.toBytes("shanghai")//
);
//放入資料
table.put(put);
table.close();
}
public void delete() throws Exception {
String tableName = "user"; // default.user / hbase:meta
HTable table = getHTableByTableName(tableName);
Delete delete = new Delete(Bytes.toBytes("10004"));
/* 刪除info列蔟下的address列的所有資料deleteColumn刪最新的deleteColumns刪除所有的
* delete.deleteColumn(Bytes.toBytes("info"),//
* Bytes.toBytes("address"));
*/
// rowkey為10004行info列蔟所有資料
delete.deleteFamily(Bytes.toBytes("info"));
table.delete(delete);
table.close();
}
public static void main(String[] args) throws Exception {
String tableName = "user";
HTable table = null;
ResultScanner resultScanner = null;
try {
table = getHTableByTableName(tableName);
Scan scan = new Scan();
// Range 範圍掃描 包頭不包尾
scan.setStartRow(Bytes.toBytes("10001"));
scan.setStopRow(Bytes.toBytes("10003")) ;
// 或者這樣寫
//Scan scan2 = new Scan(Bytes.toBytes("10001"),Bytes.toBytes("10003"));
// PrefixFilter
// PageFilter
//設定過濾器 會影響查詢速度
//scan.setFilter(filter) ;
//資料讀出後快取到blockcache下一次讀就直接從快取拿
//每一次獲取多少列
//scan.setCacheBlocks(cacheBlocks);
//scan.setCaching(caching);
// 查詢哪些列哪些列蔟
//scan.addColumn(family, qualifier)
//scan.addFamily(family)
resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
// result裡面存的是cell
System.out.println(Bytes.toString(result.getRow()));
//System.out.println(result);
for (Cell cell : result.rawCells()) {
System.out.println(//
Bytes.toString(CellUtil.cloneFamily(cell)) + ":" //
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " ->" //
+ Bytes.toString(CellUtil.cloneValue(cell)));
}
System.out.println("---------------------------------------");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(resultScanner);
IOUtils.closeStream(table);
}
}
}
Hbase架構元件剖析
)
-
Client
- 叢集訪問入口
- 使用RPC機制與HM和HR進行通訊
- 與HM進行通訊進行管理類操作
- 與HR進行資料讀寫操作
- 維護cache加快對Hbase的訪問
-
Zookeeper
- 保證叢集只有一個HM,Hbase沒有單節點故障的概念,Hbase可以有多個HM但是隻有一個去管理
- 儲存所有HR的定址入口 meta
- 實時監控HS的上下線資訊,並且通知HM
- 儲存Hbase的schema的table元資料
-
HMaster
- Hbase沒有單節點故障的概念,Hbase可以有多個HM但是隻有一個去管理
- 管理使用者對錶的操作
- 管理HR的負載均衡,調整Region分佈
- Region Split後,負責新的Region的分佈
- HS失效後,負責失效的HS上的Region的遷移工作
-
HRegion Server
- 維護region處理IO請求
- 切分大Region
- 客戶端訪問資料不需要HM參與,定址訪問ZK和HS,資料讀寫訪問HS,HM僅僅維護table和Region的元資料。
Hbase整合MapReduce
Hbase繼承所需的jar包
/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-common-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/protobuf-java-2.5.0.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-client-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-hadoop-compat-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-protocol-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/high-scale-lib-1.1.1.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/zookeeper-3.4.5.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/guava-12.0.1.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/htrace-core-2.04.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/netty-3.6.6.Final.jar
使用方法:
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar
// 此jar下面自帶的
CellCounter: Count cells in HBase table
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
TSV 格式
tab分隔
>> student.tsv
1001 zhangsan 26 shanghai
CSV 格式
逗號分隔
>> student.csv
1001,zhangsan,26,shanghai
completebulkload ★ ★ ★ ★ ★ ★ 直接變成hfile
file csv
|
hfile
|
load
程式碼例項
package com.beifeng.senior.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class User2BasicMapReduce extends Configured implements Tool {
// Mapper Class
public static class ReadUserMapper extends TableMapper<Text, Put> {
private Text mapOutputKey = new Text();
@Override
public void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
throws IOException, InterruptedException {
// get rowkey
String rowkey = Bytes.toString(key.get());
// set map的輸出key
mapOutputKey.set(rowkey);
// --------------------------------------------------------
Put put = new Put(key.get());
// iterator
for (Cell cell : value.rawCells()) {
// add family : info
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
// add column: name
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
// add column : age
if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
}
}
// context write
context.write(mapOutputKey, put);
}
}
// Reducer Class
public static class WriteBasicReducer extends TableReducer<Text, Put, //
ImmutableBytesWritable> {
@Override
public void reduce(Text key, Iterable<Put> values,
Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
for(Put put: values){
// 輸出已經定義好了
context.write(null, put);
}
}
}
// Driver
public int run(String[] args) throws Exception {
// create job
Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
// set run job class
job.setJarByClass(this.getClass());
// set job
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
// set input and set mapper
TableMapReduceUtil.initTableMapperJob(
"user", // input table
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper class
Text.class, // mapper output key
Put.class, // mapper output value
job //
);
// set reducer and output
TableMapReduceUtil.initTableReducerJob(
"basic", // output table
WriteBasicReducer.class, // reducer class
job//
);
job.setNumReduceTasks(1); // at least one, adjust as required
// submit job
boolean isSuccess = job.waitForCompletion(true) ;
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
// get configuration
Configuration configuration = HBaseConfiguration.create();
// submit job
int status = ToolRunner.run(configuration,new User2BasicMapReduce(),args) ;
// exit program
System.exit(status);
}
}
執行:
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar
Hbase 資料遷移
Hbase資料來至於Logs或者RDMS等。資料遷移的方式有
- Put API
- bilk load tool
- MapReduce job
準備樣本資料
10001 zhangsan35 male beijing 0109876543
10002 lisi 32 male shanghia 0109876563
10003 zhaoliu 35 female hangzhou 01098346543
10004 qianqi 35 male shenzhen 01098732543
-
importTSV
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2 export HADOOP_HOME=/opt/modules/hadoop-2.5.0 HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar \ ${HBASE_HOME}/lib/hbase-server-0.98.6-hadoop2.jar importtsv \ -Dimporttsv.columns=HBASE_ROW_KEY,\ info:name,info:age,info:sex,info:address,info:phone \ student \ hdfs://masterm:8020/user/hbase/importtsv
-
bulk load方式快速載入巨量資料
1、準備資料檔案
Bulk Load的第一步。會執行一個Mapreduce作業,當中使用到了HFileOutputFormat輸出HBase資料檔案:StoreFile。
HFileOutputFormat的作用在於使得輸出的HFile檔案能夠適應單個region。使用TotalOrderPartitioner類將map輸出結果分割槽到各個不同的key區間中,每一個key區間都相應著HBase表的region。
2、匯入HBase表
第二步使用completebulkload工具將第一步的結果檔案依次交給負責檔案相應region的RegionServer,並將檔案move到region在HDFS上的儲存資料夾中。一旦完畢。將資料開放給clients。
假設在bulk load準備匯入或在準備匯入與完畢匯入的臨界點上發現region的邊界已經改變,completebulkload工具會自己主動split資料檔案到新的邊界上。可是這個過程並非最佳實踐,所以使用者在使用時須要最小化準備匯入與匯入叢集間的延時,特別是當其它client在同一時候使用其它工具向同一張表匯入資料。
bulkload的優點
- 消除了對Hbase叢集的插入壓力
- 提高了Job的執行速度,降低了Job的執行時間
使用方法
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
-Dimporttsv.bulk.output=hdfs://master:8020/user/hbase/hfileoutput \
student2 \
hdfs://master:8020/user/hbase/importtsv
=======================================================================
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-hadoop2.jar \
completebulkload \
hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/hbase/hfileoutput \
student2
由於BulkLoad是繞過了Write to WAL,Write to MemStore及Flush to disk的過程,所以並不能通過WAL來進行一些複製資料的操作。
總結
這一塊的東西挺多的。詳見官網Hbase,我還得去看看官網的內容再來總結。