HBase之Bulkload(三)
三、課堂目標
1. 掌握hbase的客戶端API操作
2. 掌握hbase整合MapReduce
3. 掌握hbase整合hive
4. 掌握hbase表的rowkey設計
5. 掌握hbase表的熱點
6. 掌握hbase表的資料備份
7. 掌握hbase二級索引
四、知識要點
1. hbase客戶端API操作
- 建立Maven工程,新增依賴
<dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
- hbase表的增刪改查操作
具體操作詳細見==《hbase表的增刪改查操作.md》==文件
1、初始化一個init方法
2、建立一個表
3、修改表屬性
4、put新增資料
5、get查詢單條資料
6、scan批量查詢資料
7、delete刪除表中的列資料
8、刪除表
9、過濾器的使用
- 過濾器的型別很多,但是考科一分為兩大類--比較過濾器,專用過濾器
- 過濾器的作用是在服務端判斷資料是否滿足條件,然後只將滿足條件的資料返回給客戶端
9.1、hbase過濾器的比較運算子
LESS <
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
9.2、hbase過濾器的比較器(指定比較機制)
BinaryComparator 按位元組索引順序比較指定位元組陣列
BinaryPrefixComparator 跟前面相同,只是比較左端的資料是否相同
NullComparator 判斷給定的是否為空
BitComparator 按位比較
RegexStringComparator 提供一個正則的比較器,僅支援 EQUAL 和非EQUAL
SubstringComparator 判斷提供的子串是否出現在value中。
9.3、過濾器使用實戰
9.3.1、針對行鍵的字首過濾器
- PrefixFilter
public void testFilter1() throws Exception { // 針對行鍵的字首過濾器 Filter pf = new PrefixFilter(Bytes.toBytes("liu"));//"liu".getBytes() testScan(pf); } //定義一個方法,接受一個過濾器,返回結果資料 public void testScan(Filter filter) throws Exception { Table table = conn.getTable(TableName.valueOf("t_user_info")); Scan scan = new Scan(); //設定過濾器 scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); Iterator<Result> iter = scanner.iterator(); //遍歷所有的Result物件,獲取結果 while (iter.hasNext()) { Result result = iter.next(); List<Cell> cells = result.listCells(); for (Cell c : cells) { //獲取行鍵 byte[] rowBytes = CellUtil.cloneRow(c); //獲取列族 byte[] familyBytes = CellUtil.cloneFamily(c); //獲取列族下的列名稱 byte[] qualifierBytes = CellUtil.cloneQualifier(c); //列欄位的值 byte[] valueBytes = CellUtil.cloneValue(c); System.out.print(new String(rowBytes)+" "); System.out.print(new String(familyBytes)+":"); System.out.print(new String(qualifierBytes)+" "); System.out.println(new String(valueBytes)); } System.out.println("-----------------------"); } }
9.3.2 行過濾器
RowFilter
public void testFilter2() throws Exception { // 行過濾器 需要一個比較運算子和比較器 RowFilter rf1 = new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("user002"))); testScan(rf1); RowFilter rf2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("01"));//rowkey包含"01"子串的 testScan(rf2); }
9.3.3 列族過濾器
FamilyFilter
public void testFilter3() throws Exception { //針對列族名的過濾器 返回結果中只會包含滿足條件的列族中的資料 FamilyFilter ff1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("base_info"))); FamilyFilter ff2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base"))); testScan(ff2); }
9.3.4 列名過濾器
QualifierFilter
public void testFilter4() throws Exception { //針對列名的過濾器 返回結果中只會包含滿足條件的列的資料 QualifierFilter qf1 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("password"))); QualifierFilter qf2 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("user"))); testScan(qf2); }
9.3.5 列值的過濾器
SingleColumnValueFilter
public void testFilter4() throws Exception { //針對指定一個列的value的比較器來過濾 ByteArrayComparable comparator1 = new RegexStringComparator("^zhang"); //以zhang開頭的 ByteArrayComparable comparator2 = new SubstringComparator("si"); //包含"si"子串 SingleColumnValueFilter scvf = new SingleColumnValueFilter("base_info".getBytes(), "username".getBytes(), CompareFilter.CompareOp.EQUAL, comparator2); testScan(scvf); }
9.3.6 多個過濾器同時使用
public void testFilter4() throws Exception { //多個過濾器同時使用 select * from t1 where id >10 and age <30 //構建一個列族的過濾器 FamilyFilter cfff1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base"))); //構建一個列的字首過濾器 ColumnPrefixFilter cfff2 = new ColumnPrefixFilter("password".getBytes()); //指定多個過濾器是否同時都要滿足條件 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); filterList.addFilter(cfff1); filterList.addFilter(cfff2); testScan(filterList); }
2 hbase整合MapReduce
HBase表中的資料最終都是儲存在HDFS上,HBase天生的支援MR的操作,我們可以通過MR直接處理HBase表中的資料,並且MR可以將處理後的結果直接儲存到HBase表中。
2.1 實戰一
需求
- ==讀取hbase某張表中的資料,然後把結果寫入到另外一張hbase表==
package com.kaikeba; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; public class HBaseMR { public static class HBaseMapper extends TableMapper<Text,Put>{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //獲取rowkey的位元組陣列 byte[] bytes = key.get(); String rowkey = Bytes.toString(bytes); //構建一個put物件 Put put = new Put(bytes); //獲取一行中所有的cell物件 Cell[] cells = value.rawCells(); for (Cell cell : cells) { // f1列族 if("f1".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ // name列名 if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ put.add(cell); } // age列名 if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ put.add(cell); } } } if(!put.isEmpty()){ context.write(new Text(rowkey),put); } } } public static class HbaseReducer extends TableReducer<Text,Put,ImmutableBytesWritable>{ @Override protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for (Put put : values) { context.write(null,put); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Scan scan = new Scan(); Job job = Job.getInstance(conf); job.setJarByClass(HBaseMR.class); //使用TableMapReduceUtil 工具類來初始化我們的mapper TableMapReduceUtil.initTableMapperJob(TableName.valueOf(args[0]),scan,HBaseMapper.class,Text.class,Put.class,job); //使用TableMapReduceUtil 工具類來初始化我們的reducer TableMapReduceUtil.initTableReducerJob(args[1],HbaseReducer.class,job); //設定reduce task個數 job.setNumReduceTasks(1); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
打成jar包提交到叢集中執行
hadoop jar hbase_java_api-1.0-SNAPSHOT.jar com.kaikeba.HBaseMR t1 t2
2.2 實戰二
需求
- ==讀取HDFS檔案,把內容寫入到HBase表中==
hdfs上資料檔案 user.txt
0001 xiaoming 20
0002 xiaowang 30
0003 xiaowu 40
程式碼開發
package com.kaikeba; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; public class Hdfs2Hbase { public static class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } } public static class HBASEReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable> { protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { String[] split = key.toString().split(" "); Put put = new Put(Bytes.toBytes(split[0])); put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes()); put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes()); context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Hdfs2Hbase.class); job.setInputFormatClass(TextInputFormat.class); //輸入檔案路徑 TextInputFormat.addInputPath(job,new Path(args[0])); job.setMapperClass(HdfsMapper.class); //map端的輸出的key value 型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //指定輸出到hbase的表名 TableMapReduceUtil.initTableReducerJob(args[1],HBASEReducer.class,job); //設定reduce個數 job.setNumReduceTasks(1); System.exit(job.waitForCompletion(true)?0:1); } }
建立hbase表 t3
create 't3','f1'
打成jar包提交到叢集中執行
hadoop jar hbase_java_api-1.0-SNAPSHOT.jar com.kaikeba.Hdfs2Hbase /data/user.txt t3
2.3 實戰三
需求
- ==通過bulkload的方式批量載入資料到HBase表中==
把hdfs上面的這個路徑/input/user.txt的資料檔案,轉換成HFile格式,然後load到user這張表裡面中
知識點描述
載入資料到HBase當中去的方式多種多樣,我們可以使用HBase的javaAPI或者使用sqoop將我們的資料寫入或者匯入到HBase當中去,但是這些方式不是慢就是在匯入的過程的佔用Region資料導致效率低下,我們也可以通過MR的程式,將我們的資料直接轉換成HBase的最終儲存格式HFile,然後直接load資料到HBase當中去即可
HBase資料正常寫流程回顧
bulkload方式的處理示意圖
好處
(1).匯入過程不佔用Region資源
(2).能快速匯入海量的資料
(3).節省記憶體
==1、開發生成HFile檔案的程式碼==
package com.kaikeba; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class HBaseLoad { public static class LoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> { @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String[] split = value.toString().split(" "); Put put = new Put(Bytes.toBytes(split[0])); put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes()); put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes()); context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { final String INPUT_PATH= "hdfs://node1:9000/input"; final String OUTPUT_PATH= "hdfs://node1:9000/output_HFile"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("t4")); Job job= Job.getInstance(conf); job.setJarByClass(HBaseLoad.class); job.setMapperClass(LoadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //指定輸出的型別HFileOutputFormat2 job.setOutputFormatClass(HFileOutputFormat2.class); HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("t4"))); FileInputFormat.addInputPath(job,new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true)?0:1); } }
==2、打成jar包提交到叢集中執行==
hadoop jar hbase_java_api-1.0-SNAPSHOT.jar com.kaikeba.HBaseLoad
==3、觀察HDFS上輸出的結果==
==4、載入HFile檔案到hbase表中==
程式碼載入
package com.kaikeba; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; public class LoadData { public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181"); //獲取資料庫連線 Connection connection = ConnectionFactory.createConnection(configuration); //獲取表的管理器物件 Admin admin = connection.getAdmin(); //獲取table物件 TableName tableName = TableName.valueOf("t4"); Table table = connection.getTable(tableName); //構建LoadIncrementalHFiles載入HFile檔案 LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration); load.doBulkLoad(new Path("hdfs://node1:9000/output_HFile"), admin,table,connection.getRegionLocator(tableName)); } }
命令載入
命令格式
hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable
先將hbase的jar包新增到hadoop的classpath路徑下
export HBASE_HOME=/opt/bigdata/hbase
export HADOOP_HOME=/opt/bigdata/hadoop
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
命令載入演示
hadoop jar /opt/bigdata/hbase/lib/hbase-server-1.2.1.jar completebulkload /output_HFile t5