1. 程式人生 > >HBase優化策略及協處理器

HBase優化策略及協處理器

服務端優化

  1. 什麼導致HBase效能下降

    • Jvm記憶體分配與GC回收策略

    • HBase執行機制相關的配置分配不合理

    • 表結構設計及使用者使用方式不合理

  2. HBase資料儲存過程

    • HBase寫入時當memstore達到一定大小會flush到磁碟儲存成HFile,當HFile小檔案太多回執行compact操作進行合併(compact就是將很多小檔案合併成一個大檔案的過程。compact分為minor compaction和major compaction)

    • 當Region的大小達到某一閾值之後,回執行split操作

當HBase做compact和split操作時需要優化

  1. 常見服務端配置優化

    • Jvm設定與GC設定

    • hbase-site.xml部分屬性配置

    屬性 簡介 建議
    hbase.regionserver.handler.count rpc請求的執行緒數量,預設值是10
    hbase.hregion.max.filesize 當region的大小大於設定值後hbase就會開始split 建議手動split
    hbase.hregion.majorcompaction major compaction的執行週期 將值設定成0,在業務低峰手動執行majorcompaction
    hbase.hstore.compaction.min 一個store裡的storefile總數超過該值,會觸發預設的合併操作 預設3
    hbase.hstore.compaction.max 一次合併最多合併多少個storeFile
    hbase.hstore.blockingStoreFiles 一個region中的Store(CoulmnFamily)內有超過多少個storeFile時,則block所有的寫請求進行compaction
    hfile.block.cache.size regionserver的block cache的記憶體大小限制 在偏向讀的業務中可調大該值
    hbase.hregion.memstore.flush.size memstore超過該值將被flush
    hbase.hregion.memstore.block.multiplier 如果memstore的記憶體大小超過flush.size*multiplier,會阻塞該memstore的寫操作 建議設定成5,設定太大會有記憶體溢位的風險

常用優化策略

  1. 預先分割槽

    • HBase預設建立表的時候會自動建立一個Region分割槽

    • 建立表時預先建立一些空的Region,並指定Rowkey的儲存範圍。這樣可以減少Split操作,減少IO操作

  2. Rowkey優化

    • 利用HBase預設排序特點,將一起訪問的資料放到一起

    • 防止熱點問題,避免使用時序或者單調的遞增遞減等

  3. Column優化

    • 列族的名稱和列的描述命令儘量簡單

    • 同一張表的列族不要超過三個

讀寫優化

  1. 寫優化策略

    • 同步批量提交(預設)

    • 非同步批量提交 (會提升效能,但可能存在資料丟失,在一些業務中可以採用)

    • WAL優化,是否必須開啟(預設開啟),持久化等級

  2. 讀優化策略

    • 客戶端:Scan快取設定,批量獲取

    • 服務端:BlockCache配置是否合理,HFile是否過多(通過服務端的配置進行設定)

    • 表結構設計問題

HBase協處理器

  1. 協處理器簡介

HBase協處理器受BigTable協處理器的啟發,為使用者提供類庫和執行時環境,使得程式碼能夠在HBase RegionServer和Master上處理(使用協處理器需要謹慎,可能會導致效能下降甚至資料丟失)

協處理分為系統協處理器 and 表協處理器

  • 系統協處理器:全域性載入到RegionServer託管的所有表和Region上(是屬於所有的表)

  • 表協處理器:使用者可以指定一張表使用協處理器(只是針對一張表)

  • 觀察者(Observer):類似於關係資料庫的觸發器

  • 終端(Endpoint):動態的終端有點像儲存過程

2.實現一個ResionObserver型別的協處理器

引入pom:

<dependencies>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-common</artifactId>
  <version>1.2.4</version>
</dependency>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-server</artifactId>
  <version>1.2.4</version>
</dependency>

一個簡單demo

package com.imooc.bigdata.hbase.coprocessor.observer;

import java.awt.image.ImagingOpException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;



/**
 * Created by xt on 18-6-18.
 */
public class RegionObserverTest extends BaseRegionObserver {

  private byte[] columnFamily = Bytes.toBytes("cf");
  private byte[] countCol = Bytes.toBytes("countCol");
  private byte[] unDeleteCol = Bytes.toBytes("unDeleteCol");
  private RegionCoprocessorEnvironment environment;

  //regionserver 開啟region前執行
  @Override
  public void start(CoprocessorEnvironment e) throws IOException {
    environment = (RegionCoprocessorEnvironment) e;
  }

  //RegionServer關閉region前呼叫
  @Override
  public void stop(CoprocessorEnvironment e) throws IOException {

  }

  /**
   * 1. cf:countCol 進行累加操作。 每次插入的時候都要與之前的值進行相加
   */

  @Override
  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
      Durability durability) throws IOException {
    if (put.has(columnFamily, countCol)) {
      //獲取old countcol value
      Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
      int oldNum = 0;
      for (Cell cell : rs.rawCells()) {
        if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
          oldNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
        }
      }

      //獲取new countcol value
      List<Cell> cells = put.get(columnFamily, countCol);
      int newNum = 0;
      for (Cell cell : cells) {
        if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
          newNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
        }
      }

      //sum AND update Put例項
      put.addColumn(columnFamily, countCol, Bytes.toBytes(String.valueOf(oldNum + newNum)));
    }
  }

  /**
   * 2. 不能直接刪除unDeleteCol    刪除countCol的時候將unDeleteCol一同刪除
   */
  @Override
  public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
      WALEdit edit,
      Durability durability) throws IOException {
    //判斷是否操作cf列族
    List<Cell> cells = delete.getFamilyCellMap().get(columnFamily);
    if (cells == null || cells.size() == 0) {
      return;
    }

    boolean deleteFlag = false;
    for (Cell cell : cells) {
      byte[] qualifier = CellUtil.cloneQualifier(cell);

      if (Arrays.equals(qualifier, unDeleteCol)) {
        throw new IOException("can not delete unDel column");
      }

      if (Arrays.equals(qualifier, countCol)) {
        deleteFlag = true;
      }
    }

    if (deleteFlag) {
      delete.addColumn(columnFamily, unDeleteCol);
    }
  }
}

3.實現一個Endpoint型別的協處理器

待續。。。。。。