HBase操作
1、hbase API操作
1)首先將core-site.xml、hbase-site.xml、hdfs-site.xml引入maven工程的resources下面
2)配置pom.xml文件
增加hbase依賴
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
3)創建HbaseTest.java
package com.hsiehchou.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HbaseTest {
//配置信息
public static Configuration conf;
//獲取配置信息
static{
//alt + enter
conf = HBaseConfiguration.create();
}
判斷hbase中表是否存在
//1.判斷hbase中表是否存在
public static boolean isExist(String tableName) throws IOException{
//對表操作需要用HbaseAdmin
//HBaseAdmin admin = new HBaseAdmin(conf);老版本
Connection connection = ConnectionFactory.createConnection(conf);
//管理器
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
return admin.tableExists(TableName.valueOf(tableName));
}
在hbase中創建表
//2.在hbase中創建表
public static void createTable(String tableName, String... columnFamily) throws IOException {
//1.如果對表操作需要使用管理器
Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin)connection.getAdmin();
//2.創建描述器
HTableDescriptor hd = new HTableDescriptor(TableName.valueOf(tableName));
//3.指定多個列族
for(String cf:columnFamily){
hd.addFamily(new HColumnDescriptor(cf));
}
//4.創建表
admin.createTable(hd);
System.out.println("表已經創建成功!!!!");
}
bin/hbase shell操作
list
scan ‘ni’
describe ‘ni’
向表中添加數據
//3,向表中添加數據 put rowkey cf:列族
public static void addData(String tableName, String rowkey, String cf, String column, String value) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tableName));
//添加數據 put方式
Put put = new Put(Bytes.toBytes(rowkey));
//指定列族 列 值
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
}
刪除一行數據
//4.刪除一行數據
public static void deleteRow(String tableName, String rowkey) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowkey));
table.delete(delete);
}
刪除多個rowkey的數據
//5.刪除多個rowkey的數據
public static void deleteMore(String tableName, String... rowkey) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tableName));
//封裝delete
List<Delete> d = new ArrayList<Delete>();
//遍歷rowkey
for(String rk:rowkey){
Delete dd = new Delete(Bytes.toBytes(rk));
d.add(dd);
}
table.delete(d);
}
全表掃描
//6.全表掃描
public static void scanAll(String tableName) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
ResultScanner rs = table.getScanner(scan);
//遍歷
for(Result r:rs){
//單元格
Cell[] cells = r.rawCells();
for(Cell c:cells) {
System.out.println("rowkey為:" + Bytes.toString(CellUtil.cloneRow(c)));
System.out.println("列族為:" + Bytes.toString(CellUtil.cloneFamily(c)));
System.out.println("值為:" + Bytes.toString(CellUtil.cloneValue(c)));
}
}
}
刪除表
//7.刪除表
public static void deleteTable(String tableName) throws IOException {
//1.如果對表操作需要使用管理器
Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin)connection.getAdmin();
admin.disableTable(tableName);
admin.deleteTable(TableName.valueOf(tableName));
}
public static void main(String[] args) throws IOException {
//System.out.println(isExist(“user”));
//create ‘表名’,’列族名’
//createTable(“ni”,”info1”,”info2”,”info3”);
//addData(“ni”,”shanghai”,”info1”,”name”,”lilei”);
//deleteRow(“ni”,”shanghai”);
//deleteMore(“ni”,”shanghai1”,”shanghai2”);
//scanAll(“ni”);
deleteTable(“ni”);
}
}
2、hbase-MR
hbase主要擅長的領域是存儲數據,不擅長分析數據
hbase如果想計算的話需要結合hadoop的mapreduce
hbase-mr所需的jar包查看
bin/hbase mapredcp
配置臨時環境變量
export HBASE_HOME=/root/hd/hbase-1.3.0
export HADOOP_HOME=/root/hd/hadoop-2.8.4
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
跑hbase-mr程序
bin/yarn jar /root/hd/hbase-1.3.0/lib/hbase-server-1.3.0.jar rowcounter user
3、hbase的表操作
場景一:
region分片
指定列的過濾
name age high
name
代碼實現
ReadLoveMapper.java
package com.hsiehchou.mr;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* HBase -MR
* mapper類進行對數據的讀取操作
* key:ImmutableBytesWritable hbase中的rowkey
* value:封裝的一條條的數據
*/
public class ReadLoveMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//1.讀取數據 根據rowkey拿到數據
Put put = new Put(key.get());
//2.過濾列 Cell單元格
for (Cell c:value.rawCells()){
//拿到info列族數據 如果是info列族 取出 如果不是info 過濾掉
if("info".equals(Bytes.toString(CellUtil.cloneFamily(c)))){
//過濾列
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(c)))){
put.add(c);
}
}
}
//3.輸出到reducer端
context.write(key,put);
}
}
WriteLoveReducer .java
package com.hsiehchou.mr;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
/**
* keyIn:ImmutableBytesWritable
* valueIn:Put
* keyOut:NullWritable(在put裏面已經有了rowkey了,所以不需要了)
*/
public class WriteLoveReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put p:values){
context.write(NullWritable.get(),p);
}
}
}
LoverDriver .java
package com.hsiehchou.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class LoverDriver implements Tool {
private Configuration conf;
public void setConf(Configuration configuration) {
this.conf = HBaseConfiguration.create(configuration);
}
public Configuration getConf() {
return this.conf;
}
public int run(String[] strings) throws Exception {
//1.創建任務
Job job = Job.getInstance(conf);
//2.指定運行的主類
job.setJarByClass(LoverDriver.class);
//3.配置job
Scan scan = new Scan();
//4.設置具體運行的mapper類
TableMapReduceUtil.initTableMapperJob("love",
scan,
ReadLoveMapper.class,
ImmutableBytesWritable.class,
Put.class,
job
);
//5.設置具體運行的Reducer類
TableMapReduceUtil.initTableReducerJob("lovemr",
WriteLoveReducer.class,
job
);
//6.設置reduceTask
job.setNumReduceTasks(1);
boolean rs = job.waitForCompletion(true);
return rs?0:1;
}
public static void main(String[] args) {
try {
//狀態碼
int sts = ToolRunner.run(new LoverDriver(), args);
System.exit(sts);
} catch (Exception e) {
e.printStackTrace();
}
}
}
場景二:
把hdfs中的數據導入到hbase表中
hbase-mr
代碼實現
ReadHdfsMapper .java
package com.hsiehchou.mr1;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 讀取hdfs中的數據
* hdfs ->hbase
*/
public class ReadHdfsMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.讀取數據
String line = value.toString();
//2.切分數據
String[] fields = line.split("\t");
//3.封裝數據
byte[] rowkey = Bytes.toBytes(fields[0]);
byte[] name = Bytes.toBytes(fields[1]);
byte[] desc = Bytes.toBytes(fields[2]);
//4.封裝成put
Put put = new Put(rowkey);
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),name);
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("desc"),desc);
//5.輸出到reducer
context.write(new ImmutableBytesWritable(rowkey),put);
}
}
WriteHbaseReducer.java
package com.hsiehchou.mr1;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class WriteHbaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for(Put p:values){
context.write(NullWritable.get(),p);
}
}
}
LoveDriver.java
package com.hsiehchou.mr1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class LoveDriver implements Tool {
private Configuration conf = null;
public void setConf(Configuration configuration) {
this.conf = HBaseConfiguration.create(configuration);
}
public Configuration getConf() {
return this.conf;
}
public int run(String[] strings) throws Exception {
//1.創建job
Job job = Job.getInstance();
job.setJarByClass(LoveDriver.class);
//2.配置mapper
job.setMapperClass(ReadHdfsMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//3.配置reducer
TableMapReduceUtil.initTableReducerJob("lovehdfs", WriteHbaseReducer.class, job);
//4.輸入配置 hdfs讀數據 inputformat
FileInputFormat.addInputPath(job,new Path("/lovehbase/"));
//5.需要配置outputformat嗎?不需要 reducer中已經指定了表
return job.waitForCompletion(true)? 0:1;
}
public static void main(String[] args) {
try {
int sts = ToolRunner.run(new LoveDriver(),args);
System.exit(sts);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4、hbase優化
1)預分區問題
region分片?表很大 bigtable
分布式?數據量大
region存儲數據,如果有多個region,每個region負責維護一部分的rowkey{startrowkey, endrowkey}
1~10001
1~2001 1980
2001~40002
分多少片?提前規劃好,提高hbase的性能
進行存儲數據前做好rowkey的預分區優化hbase
實際操作:
create ‘user_p’,’info’,’partition’,SPLITS =>[‘201’,’202’,’203’,’204’]
Table Regions
Region Server | Start Key | End Key |
hsiehchou123:16020 | -∞ | 201 |
hsiehchou124:16020 | 201 | 202 |
hsiehchou124:16020 | 202 | 203 |
hsiehchou123:16020 | 203 | 204 |
hsiehchou122:16020 | 204 | +∞ |
hsiehchou124:16020 201 202
hsiehchou124:16020 202 203
hsiehchou123:16020 203 204
hsiehchou122:16020 204 +∞
create ‘user_pppp’,’partition’,SPLITS_FILE => ‘partitions.txt’
partitions.txt’放在hbase-shell路徑下
2)rowkey如何設計
rowkey是數據的唯一標識,這條數據存儲在哪個分區由預分區範圍決定
合理設計rowkey
如一份數據分為5個region存儲
但是我們需要盡可能的保持每個region中的數據量差不多
盡可能的打散數據,平均分配到每個region中即可
解決方案:
生成隨機數、hash/散列值
原本的rowkey是201,hash後
dfgyfugpgdcjhgfd11412nod
202變為:
21dqddwdgjohfxsovbxiufq12
字符串拼接:
20190316_a3d4
20190316_g04f
反轉字符串:
201903161->161309102
201903162->261309102
3)hbase基礎優化
hbase用的hdfs存儲
datanode允許最大文件打開數
默認4096 調大
dfs.datanode.max.transfer.threads
hdfs-site.xml
優化等待時間
dfs.image.transfer.timeout
默認60000毫秒
調大
內存優化:
hadoop-env.sh設置內存的堆大小
30%~40%最好
2G
512m
export HADOOP_PORTMAP_OPTS=’-Xmx512m $HADOOP_PORTMAP_OPTS’
HBase操作