[大資料]連載No19 Hbase Shell和API的增刪改查+與MapperReducer讀寫操作
阿新 • • 發佈:2019-01-01
本次總結如下
1、Hbase Shell的常用命令
2、Java APi 對hbase的增刪改查
3、Mapper Reducer從hbase讀寫數資料,計算單詞數量,並寫回hbase
登入hbase Shell
[[email protected] ~]#/home/softs/hbase-0.98.12.1-hadoop2/bin/hbase shell
建立表user create 'test', 'cf' # test表明 cf列族
查詢表user scan 'test'
插入資料 put 'test', 'row1', 'cf:username', 'value1' # row行唯一識別符號 username列名
查詢資料 list 'test'
id查詢 get 'test', 'row1'
刪除表 disable 'test' 然後 drop 'test' #刪除前要先禁用掉
hbase(main):001:0> scan 'user' SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/softs/hbase-0.98.12.1-hadoop2/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/softs/hadoop-2.5.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 2018-06-29 04:03:54,274 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable ROW COLUMN+CELL 2 column=col1:count, timestamp=1530214622282, value=1 userId1 column=col1:age, timestamp=1530201359347, value=2 userId1 column=col1:name, timestamp=1530203162657, value=xiaohong userId1 column=col2:age, timestamp=1530203197562, value=33 userId1 column=col2:name, timestamp=1530201359347, value=\xE5\xB0\x8F\xE7\xBA\xA2 2 row(s) in 0.3550 seconds hbase(main):002:0>
Hbase WebUI檢視叢集資訊
java操作hbase Api
public class HbaseCrub { HBaseAdmin hbase; HTable table; String user = "user"; String col1="col1"; String col2="col2"; @Before public void before() throws Exception{ Configuration configuration=new Configuration(); /**指定zookeeper叢集,找到配置檔案*/ configuration.set("hbase.zookeeper.quorum","master,node1,node2"); /**資料庫連線**/ hbase =new HBaseAdmin(configuration); table=new HTable(configuration,user.getBytes()); } @After public void end() throws Exception{ if(hbase != null) { hbase.close(); } if(table != null) { table.close(); } } @Test public void createTable() throws Exception{ if(hbase.tableExists(user.getBytes())){ /*禁用,刪除**/ hbase.disableTable(user.getBytes()); hbase.deleteTable(user.getBytes()); } HTableDescriptor descriptor =new HTableDescriptor(TableName.valueOf(user)); /**要先指定列族*/ HColumnDescriptor columnDescriptor=new HColumnDescriptor(col1.getBytes()); /**記憶體快取*/ columnDescriptor.setInMemory(true); descriptor.addFamily(columnDescriptor); HColumnDescriptor columnDescriptor2=new HColumnDescriptor(col2.getBytes()); /**使用資料使用記憶體先存放*/ columnDescriptor2.setInMemory(false); descriptor.addFamily(columnDescriptor2); hbase.createTable(descriptor); } @Test public void insertUser() throws Exception{ /**指定rowkey*/ String rowKey ="userId1"; Put put =new Put(rowKey.getBytes()); put.add(col1.getBytes(),"name".getBytes(),"小石頭".getBytes()); put.add(col1.getBytes(),"age".getBytes(),"2".getBytes()); put.add(col2.getBytes(),"name".getBytes(),"小紅".getBytes()); table.put(put); } @Test public void deleteUser() throws Exception{ /**指定rowkey*/ Delete delete =new Delete("userId1".getBytes()); delete.deleteColumn(col1.getBytes(),"name".getBytes()); table.delete(delete); } @Test public void getByUserId() throws Exception{ /**指定rowkey*/ Get get =new Get("userId1".getBytes()); /**指定返回的列*/ get.addColumn(col1.getBytes(),"age".getBytes()); Result result= table.get(get); //單行記錄 Cell cell=result.getColumnLatestCell(col1.getBytes(),"age".getBytes()); System.out.println(new String(CellUtil.cloneValue(cell))); } @Test public void listUsers() throws Exception{ /** * Scan 查詢 返回多行資料 * 儘量不要用全表掃描 * 1、範圍查詢 起始rowkey 結束rowkey * 2、過濾器 filter 慎重!! * @throws Exception */ Scan scan =new Scan(); scan.setStartRow("userId0".getBytes()); scan.setStopRow("userId3".getBytes()); // 新增查詢條件 SingleColumnValueFilter filter1 = new SingleColumnValueFilter( col1.getBytes(), "age".getBytes(), CompareFilter.CompareOp.EQUAL, "2".getBytes()); scan.setFilter(filter1); ResultScanner results= table.getScanner(scan); results.forEach(result -> { System.out.print(new String(result.getValue(col1.getBytes(),"name".getBytes()))+"\t"); System.out.println(new String(result.getValue(col1.getBytes(),"age".getBytes()))); }); } }
hbase與mapperReduce整合,讀行資料,統計單詞數量
job類
public static void main(String []args) throws Exception{ Configuration conf =new Configuration(); /**本地執行*/ conf.set("fs.defaultFS","hdfs://master:8020"); conf.set("hbase.zookeeper.quorum", "master,node1,node2"); Job job =Job.getInstance(conf); job.setJarByClass(WCJob.class); /**從hbase讀取資料設定查詢條件*/ Scan scan =new Scan(); TableMapReduceUtil.initTableMapperJob("user",scan,WCMapper.class, Text.class, IntWritable.class,job,false); /** * 最後一個引數指定為false,因為是本地執行,需要注意 * */ TableMapReduceUtil.initTableReducerJob("user",WCReducer.class,job,null,null,null,null,false); job.waitForCompletion(true); }
Mapper類,統計單詞數量,輸出
public class WCMapper extends TableMapper<Text,IntWritable>{ /** * 每次讀一行呼叫一次map方法 * */ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String age =new String(value.getValue("col1".getBytes(),"age".getBytes())); context.write(new Text(age),new IntWritable(1)); } }reducer類,計算結果,並寫入到hbase
/** * Text,IntWritable 和mapper資料的資料型別一直 * */ public class WCReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int num =0; for(IntWritable in : values){ num ++; } /**以年級為rowkey,寫入到hbase**/ Put put =new Put(key.getBytes()); put.add("col1".getBytes(),"count".getBytes(),(num+"").getBytes()); context.write(null,put); } }
結果檢視,正確