HBase優化策略及協處理器
服務端優化
什麼導致HBase效能下降
Jvm記憶體分配與GC回收策略
HBase執行機制相關的配置分配不合理
表結構設計及使用者使用方式不合理
HBase資料儲存過程
HBase寫入時當memstore達到一定大小會flush到磁碟儲存成HFile,當HFile小檔案太多回執行compact操作進行合併(compact就是將很多小檔案合併成一個大檔案的過程。compact分為minor compaction和major compaction)
當Region的大小達到某一閾值之後,回執行split操作
當HBase做compact和split操作時需要優化
常見服務端配置優化
Jvm設定與GC設定
hbase-site.xml部分屬性配置
屬性 簡介 建議 hbase.regionserver.handler.count rpc請求的執行緒數量,預設值是10 hbase.hregion.max.filesize 當region的大小大於設定值後hbase就會開始split 建議手動split hbase.hregion.majorcompaction major compaction的執行週期 將值設定成0,在業務低峰手動執行majorcompaction hbase.hstore.compaction.min 一個store裡的storefile總數超過該值,會觸發預設的合併操作 預設3 hbase.hstore.compaction.max 一次合併最多合併多少個storeFile hbase.hstore.blockingStoreFiles 一個region中的Store(CoulmnFamily)內有超過多少個storeFile時,則block所有的寫請求進行compaction hfile.block.cache.size regionserver的block cache的記憶體大小限制 在偏向讀的業務中可調大該值 hbase.hregion.memstore.flush.size memstore超過該值將被flush hbase.hregion.memstore.block.multiplier 如果memstore的記憶體大小超過flush.size*multiplier,會阻塞該memstore的寫操作 建議設定成5,設定太大會有記憶體溢位的風險
常用優化策略
預先分割槽
HBase預設建立表的時候會自動建立一個Region分割槽
建立表時預先建立一些空的Region,並指定Rowkey的儲存範圍。這樣可以減少Split操作,減少IO操作
Rowkey優化
利用HBase預設排序特點,將一起訪問的資料放到一起
防止熱點問題,避免使用時序或者單調的遞增遞減等
Column優化
列族的名稱和列的描述命令儘量簡單
同一張表的列族不要超過三個
讀寫優化
寫優化策略
同步批量提交(預設)
非同步批量提交 (會提升效能,但可能存在資料丟失,在一些業務中可以採用)
WAL優化,是否必須開啟(預設開啟),持久化等級
讀優化策略
客戶端:Scan快取設定,批量獲取
服務端:BlockCache配置是否合理,HFile是否過多(通過服務端的配置進行設定)
表結構設計問題
HBase協處理器
- 協處理器簡介
HBase協處理器受BigTable協處理器的啟發,為使用者提供類庫和執行時環境,使得程式碼能夠在HBase RegionServer和Master上處理(使用協處理器需要謹慎,可能會導致效能下降甚至資料丟失)
協處理分為系統協處理器 and 表協處理器
系統協處理器:全域性載入到RegionServer託管的所有表和Region上(是屬於所有的表)
表協處理器:使用者可以指定一張表使用協處理器(只是針對一張表)
觀察者(Observer):類似於關係資料庫的觸發器
終端(Endpoint):動態的終端有點像儲存過程
2.實現一個ResionObserver型別的協處理器
引入pom:
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
一個簡單demo
package com.imooc.bigdata.hbase.coprocessor.observer;
import java.awt.image.ImagingOpException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Created by xt on 18-6-18.
*/
public class RegionObserverTest extends BaseRegionObserver {
private byte[] columnFamily = Bytes.toBytes("cf");
private byte[] countCol = Bytes.toBytes("countCol");
private byte[] unDeleteCol = Bytes.toBytes("unDeleteCol");
private RegionCoprocessorEnvironment environment;
//regionserver 開啟region前執行
@Override
public void start(CoprocessorEnvironment e) throws IOException {
environment = (RegionCoprocessorEnvironment) e;
}
//RegionServer關閉region前呼叫
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
}
/**
* 1. cf:countCol 進行累加操作。 每次插入的時候都要與之前的值進行相加
*/
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
Durability durability) throws IOException {
if (put.has(columnFamily, countCol)) {
//獲取old countcol value
Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
int oldNum = 0;
for (Cell cell : rs.rawCells()) {
if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
oldNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//獲取new countcol value
List<Cell> cells = put.get(columnFamily, countCol);
int newNum = 0;
for (Cell cell : cells) {
if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
newNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//sum AND update Put例項
put.addColumn(columnFamily, countCol, Bytes.toBytes(String.valueOf(oldNum + newNum)));
}
}
/**
* 2. 不能直接刪除unDeleteCol 刪除countCol的時候將unDeleteCol一同刪除
*/
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
WALEdit edit,
Durability durability) throws IOException {
//判斷是否操作cf列族
List<Cell> cells = delete.getFamilyCellMap().get(columnFamily);
if (cells == null || cells.size() == 0) {
return;
}
boolean deleteFlag = false;
for (Cell cell : cells) {
byte[] qualifier = CellUtil.cloneQualifier(cell);
if (Arrays.equals(qualifier, unDeleteCol)) {
throw new IOException("can not delete unDel column");
}
if (Arrays.equals(qualifier, countCol)) {
deleteFlag = true;
}
}
if (deleteFlag) {
delete.addColumn(columnFamily, unDeleteCol);
}
}
}
3.實現一個Endpoint型別的協處理器
待續。。。。。。