1. 程式人生 > 程式設計 >淺談HBase在SpringBoot專案裡的應用(含HBaseUtil工具類)

淺談HBase在SpringBoot專案裡的應用(含HBaseUtil工具類)

背景:

專案這兩個月開始使用HBase來讀寫資料,網上現成的HBase工具類要麼版本混雜,要麼只是Demo級別的簡單實現,各方面都不完善;

而且我發現HBase查詢有很多種方式,首先大方向上有 Get 和 Scan兩種,其次行鍵、列族、列名(限定符)、列值(value)、時間戳版本等多種組合條件,還有各種過濾器的選擇,協處理器的應用,所以必須根據自己專案需求和HBase行列設計來自定義HBase工具類和實現類!

經過我自己的研究整理,在此分享下初步的實現方案吧 ~

注:HBase版本:1.3.0 - CDH5.13.0 、SpringBoot版本:1.5.9

需要注意的是我用的是原生api,沒有用和spring或者springboot整合的HbaseTemplate等,因為這方面資料較少而且聽說並沒有那麼好用…

一、pom.xml 依賴

<dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-client</artifactId>
 <version>1.3.0</version>
 <exclusions>
  <exclusion>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
  </exclusion>
  <exclusion>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
  </exclusion>
  <exclusion>
   <groupId>javax.servlet</groupId>
   <artifactId>servlet-api</artifactId>
  </exclusion>
 </exclusions>
</dependency>

<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-common</artifactId>
 <version>2.6.0</version>
</dependency>

<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-mapreduce-client-core</artifactId>
 <version>2.6.0</version>
</dependency>

<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-mapreduce-client-common</artifactId>
 <version>2.6.0</version>
</dependency>

<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-hdfs</artifactId>
 <version>2.6.0</version>
</dependency>

二、application.yml 專案配置

此處我是自定義HBase配置,後面會有專門的配置類來載入這個配置

hbase:

conf:

confMaps:

'hbase.zookeeper.quorum' : 'cdh1:2181,cdh2:2181,cdh3:2181'

三、HbaseConfig 自定義配置類

HbaseConfig.java:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Map;

/**
 * Hbase-Conf配置
 *
 * @Author: yuanj
 * @Date: 2018/10/12 10:49
 */
@Configuration
@ConfigurationProperties(prefix = HbaseConfig.CONF_PREFIX)
public class HbaseConfig {

 public static final String CONF_PREFIX = "hbase.conf";

 private Map<String,String> confMaps;

 public Map<String,String> getconfMaps() {
  return confMaps;
 }
 public void setconfMaps(Map<String,String> confMaps) {
  this.confMaps = confMaps;
 }
}

不瞭解@ConfigurationProperties這個註解的兄弟可以去百度下,它可以將application.yml中的配置匯入到該類的成員變數裡!

也就是說springboot專案啟動完成後 confMaps變數裡已經存在一個key為 hbase.zookeeper.quorum ,value為 cdh1:2181,cdh3:2181的entry了!

四、HBaseUtils工具類

首先新增 SpringContextHolder 工具類,下面會用到:

package com.moerlong.credit.core;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * Spring的ApplicationContext的持有者,可以用靜態方法的方式獲取spring容器中的bean
 */
@Component
public class SpringContextHolder implements ApplicationContextAware {

 private static ApplicationContext applicationContext;

 @Override
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  SpringContextHolder.applicationContext = applicationContext;
 }

 public static ApplicationContext getApplicationContext() {
  assertApplicationContext();
  return applicationContext;
 }

 @SuppressWarnings("unchecked")
 public static <T> T getBean(String beanName) {
  assertApplicationContext();
  return (T) applicationContext.getBean(beanName);
 }

 public static <T> T getBean(Class<T> requiredType) {
  assertApplicationContext();
  return applicationContext.getBean(requiredType);
 }

 private static void assertApplicationContext() {
  if (SpringContextHolder.applicationContext == null) {
   throw new RuntimeException("applicaitonContext屬性為null,請檢查是否注入了SpringContextHolder!");
  }
 }

}

HBaseUtils .java:

import com.moerlong.credit.config.HbaseConfig;
import com.moerlong.credit.core.SpringContextHolder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@DependsOn("springContextHolder") //控制依賴順序,保證springContextHolder類在之前已經載入
@Component
public class HBaseUtils {

 private Logger logger = LoggerFactory.getLogger(this.getClass());
 
 //手動獲取hbaseConfig配置類物件
 private static HbaseConfig hbaseConfig = SpringContextHolder.getBean("hbaseConfig");

 private static Configuration conf = HBaseConfiguration.create();
 private static ExecutorService pool = Executors.newScheduledThreadPool(20); //設定連線池
 private static Connection connection = null;
 private static HBaseUtils instance = null;
 private static Admin admin = null;

 private HBaseUtils(){
  if(connection == null){
   try {
    //將hbase配置類中定義的配置載入到連線池中每個連線裡
    Map<String,String> confMap = hbaseConfig.getconfMaps();
    for (Map.Entry<String,String> confEntry : confMap.entrySet()) {
     conf.set(confEntry.getKey(),confEntry.getValue());
    }
    connection = ConnectionFactory.createConnection(conf,pool);
    admin = connection.getAdmin();
   } catch (IOException e) {
    logger.error("HbaseUtils例項初始化失敗!錯誤資訊為:" + e.getMessage(),e);
   }
  }
 } 
 //簡單單例方法,如果autowired自動注入就不需要此方法
 public static synchronized HBaseUtils getInstance(){
  if(instance == null){
   instance = new HBaseUtils();
  }
  return instance;
 }

 /**
  * 建立表
  *
  * @param tableName   表名
  * @param columnFamily  列族(陣列)
  */
 public void createTable(String tableName,String[] columnFamily) throws IOException{
  TableName name = TableName.valueOf(tableName);
  //如果存在則刪除
  if (admin.tableExists(name)) {
   admin.disableTable(name);
   admin.deleteTable(name);
   logger.error("create htable error! this table {} already exists!",name);
  } else {
   HTableDescriptor desc = new HTableDescriptor(name);
   for (String cf : columnFamily) {
    desc.addFamily(new HColumnDescriptor(cf));
   }
   admin.createTable(desc);
  }
 }

 /**
  * 插入記錄(單行單列族-多列多值)
  *
  * @param tableName   表名
  * @param row    行名
  * @param columnFamilys  列族名
  * @param columns   列名(陣列)
  * @param values   值(陣列)(且需要和列一一對應)
  */
 public void insertRecords(String tableName,String row,String columnFamilys,String[] columns,String[] values) throws IOException {
  TableName name = TableName.valueOf(tableName);
  Table table = connection.getTable(name);
  Put put = new Put(Bytes.toBytes(row));
  for (int i = 0; i < columns.length; i++) {
   put.addColumn(Bytes.toBytes(columnFamilys),Bytes.toBytes(columns[i]),Bytes.toBytes(values[i]));
   table.put(put);
  }
 }

 /**
  * 插入記錄(單行單列族-單列單值)
  *
  * @param tableName   表名
  * @param row    行名
  * @param columnFamily  列族名
  * @param column   列名
  * @param value    值
  */
 public void insertOneRecord(String tableName,String columnFamily,String column,String value) throws IOException {
  TableName name = TableName.valueOf(tableName);
  Table table = connection.getTable(name);
  Put put = new Put(Bytes.toBytes(row));
  put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
  table.put(put);
 }

 /**
  * 刪除一行記錄
  *
  * @param tablename   表名
  * @param rowkey   行名
  */
 public void deleteRow(String tablename,String rowkey) throws IOException {
  TableName name = TableName.valueOf(tablename);
  Table table = connection.getTable(name);
  Delete d = new Delete(rowkey.getBytes());
  table.delete(d);
 }

 /**
  * 刪除單行單列族記錄
  * @param tablename   表名
  * @param rowkey   行名
  * @param columnFamily  列族名
  */
 public void deleteColumnFamily(String tablename,String rowkey,String columnFamily) throws IOException {
  TableName name = TableName.valueOf(tablename);
  Table table = connection.getTable(name);
  Delete d = new Delete(rowkey.getBytes()).deleteFamily(Bytes.toBytes(columnFamily));
  table.delete(d);
 }

 /**
  * 刪除單行單列族單列記錄
  *
  * @param tablename   表名
  * @param rowkey   行名
  * @param columnFamily  列族名
  * @param column   列名
  */
 public void deleteColumn(String tablename,String column) throws IOException {
  TableName name = TableName.valueOf(tablename);
  Table table = connection.getTable(name);
  Delete d = new Delete(rowkey.getBytes()).deleteColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column));
  table.delete(d);
 }

 /**
  * 查詢一行記錄
  *
  * @param tablename   表名
  * @param rowKey   行名
  */
 public static String selectRow(String tablename,String rowKey) throws IOException {
  String record = "";
  TableName name=TableName.valueOf(tablename);
  Table table = connection.getTable(name);
  Get g = new Get(rowKey.getBytes());
  Result rs = table.get(g);
  NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> map = rs.getMap();
  for (Cell cell : rs.rawCells()) {
   StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("\t")
     .append(Bytes.toString(cell.getFamily())).append("\t")
     .append(Bytes.toString(cell.getQualifier())).append("\t")
     .append(Bytes.toString(cell.getValue())).append("\n");
   String str = stringBuffer.toString();
   record += str;
  }
  return record;
 }

 /**
  * 查詢單行單列族單列記錄
  *
  * @param tablename   表名
  * @param rowKey   行名
  * @param columnFamily  列族名
  * @param column   列名
  * @return
  */
 public static String selectValue(String tablename,String rowKey,String column) throws IOException {
  TableName name=TableName.valueOf(tablename);
  Table table = connection.getTable(name);
  Get g = new Get(rowKey.getBytes());
  g.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column));
  Result rs = table.get(g);
  return Bytes.toString(rs.value());
 }

 /**
  * 查詢表中所有行(Scan方式)
  *
  * @param tablename
  * @return
  */
 public String scanAllRecord(String tablename) throws IOException {
  String record = "";
  TableName name=TableName.valueOf(tablename);
  Table table = connection.getTable(name);
  Scan scan = new Scan();
  ResultScanner scanner = table.getScanner(scan);
  try {
   for(Result result : scanner){
    for (Cell cell : result.rawCells()) {
     StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("\t")
       .append(Bytes.toString(cell.getFamily())).append("\t")
       .append(Bytes.toString(cell.getQualifier())).append("\t")
       .append(Bytes.toString(cell.getValue())).append("\n");
     String str = stringBuffer.toString();
     record += str;
    }
   }
  } finally {
   if (scanner != null) {
    scanner.close();
   }
  }
  return record;
 }

 /**
  * 根據rowkey關鍵字查詢報告記錄
  *
  * @param tablename
  * @param rowKeyword
  * @return
  */
 public List scanReportDataByRowKeyword(String tablename,String rowKeyword) throws IOException {
  ArrayList<> list = new ArrayList<>();

  Table table = connection.getTable(TableName.valueOf(tablename));
  Scan scan = new Scan();
 
 //新增行鍵過濾器,根據關鍵字匹配
  RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator(rowKeyword));
  scan.setFilter(rowFilter);

  ResultScanner scanner = table.getScanner(scan);
  try {
   for (Result result : scanner) {
    //TODO 此處根據業務來自定義實現
    list.add(null);
   }
  } finally {
   if (scanner != null) {
    scanner.close();
   }
  }  
  return list;
 }

 /**
  * 根據rowkey關鍵字和時間戳範圍查詢報告記錄
  *
  * @param tablename
  * @param rowKeyword
  * @return
  */
 public List scanReportDataByRowKeywordTimestamp(String tablename,String rowKeyword,Long minStamp,Long maxStamp) throws IOException {
  ArrayList<> list = new ArrayList<>();

  Table table = connection.getTable(TableName.valueOf(tablename));
  Scan scan = new Scan();
  //新增scan的時間範圍
  scan.setTimeRange(minStamp,maxStamp);

  RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator(rowKeyword));
  scan.setFilter(rowFilter);

  ResultScanner scanner = table.getScanner(scan);
  try {
   for (Result result : scanner) {
    //TODO 此處根據業務來自定義實現
    list.add(null);
   }
  } finally {
   if (scanner != null) {
    scanner.close();
   }
  }  
  return list;
 }

 /**
  * 刪除表操作
  *
  * @param tablename
  */
 public void deleteTable(String tablename) throws IOException {
  TableName name=TableName.valueOf(tablename);
  if(admin.tableExists(name)) {
   admin.disableTable(name);
   admin.deleteTable(name);
  }
 }

 /**
  * 利用協處理器進行全表count統計
  *
  * @param tablename
  */
 public Long countRowsWithCoprocessor(String tablename) throws Throwable {
  TableName name=TableName.valueOf(tablename);
  HTableDescriptor descriptor = admin.getTableDescriptor(name);

  String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
  if (! descriptor.hasCoprocessor(coprocessorClass)) {
   admin.disableTable(name);
   descriptor.addCoprocessor(coprocessorClass);
   admin.modifyTable(name,descriptor);
   admin.enableTable(name);
  }

  //計時
  StopWatch stopWatch = new StopWatch();
  stopWatch.start();

  Scan scan = new Scan();
  AggregationClient aggregationClient = new AggregationClient(conf);

  Long count = aggregationClient.rowCount(name,new LongColumnInterpreter(),scan);

  stopWatch.stop();
  System.out.println("RowCount:" + count + ",全表count統計耗時:" + stopWatch.getTotalTimeMillis());

  return count;
 }
}

五、使用

接下來只需要在專案業務類裡注入hbaseUtils就可以使用了:

@Autowired

private HBaseUtils hBaseUtils;

補充知識:springboot整合Hbase

springboot專案需要整合SpringCloud

依賴

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-shaded-client</artifactId>
      <version>1.2.6</version>
    </dependency>
<!---->

yml配置:

自定義配置讀取zookeeper配置

hbase:

zookeeper:

quorum: hbase126-node2:2181

config配置:

import net.cc.commons.exception.CCRuntimeException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.io.IOException;
import java.util.function.Supplier;

/**
* @Author wangqiubao
* @Date 2019/9/24 15:28
* @Description
**/
@Configuration
public class UcareHbaseConfiguration {
  /**
   * 讀取HBase的zookeeper地址
   */
  @Value("${hbase.zookeeper.quorum}")
  private String quorum;

  /**
   * 配置HBase連線引數
   *
   * @return
   */
  @Bean
  public org.apache.hadoop.conf.Configuration hbaseConfig() {
    org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
    config.set(HConstants.ZOOKEEPER_QUORUM,quorum);
    return config;
  }
  //每次呼叫get方法就會建立一個Connection
  @Bean
  public Supplier<Connection> hbaseConnSupplier() {
    return () -> {
      try {
        return hbaseConnection();
      } catch (IOException e) {
        throw new CCRuntimeException(e);
      }
    };
  }

  @Bean
  //@Scope標明模式,預設單例模式. prototype多例模式
  //若是在其他類中直接@Autowired引入的,多例就無效了,因為那個類在初始化的時候,已經建立了建立了這個bean了,之後呼叫的時候,不會重新建立,若是想要實現多例,就要每次呼叫的時候,手動獲取bean
  @Scope(value = "prototype")
  public Connection hbaseConnection() throws IOException {
    return ConnectionFactory.createConnection(hbaseConfig());
  }
}

使用

spring管理

  /**
   * 內部已實現執行緒安全的連線池
   */
  @Autowired
  private Connection hbaseConnection;

插入/更新資料

public void aaaa() throws IOException {
  try (Table table = hbaseConnection.getTable(TableName.valueOf("表名"))) {//獲取表連線
    //配置一條資料
    // 行鍵
    Put put = new Put(Bytes.toBytes("key主鍵"));
    put.addColumn(Bytes.toBytes("列族"),Bytes.toBytes("列"),Bytes.toBytes("值"));
    .....//每個有資料的列都要一個addColumn
    //put插入資料
    table.put(put);
  }
}

查詢

根據主鍵查詢內容

try (Table table = hbaseConnection.getTable(TableName.valueOf("表名"))) {
  Result result = table.get(new Get(asRowKey(date,acid)));
  if (result == null) return null;

  // 列名為starttime,最後一條就是該航班最新的航跡
  Cell latestCell = Iterables.getLast(result.listCells());
  return AdsbTrackProto.AdsbTrack.parseFrom(CellUtil.cloneValue(latestCell));
}

以上這篇淺談HBase在SpringBoot專案裡的應用(含HBaseUtil工具類)就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。