hbase 原始碼解析(21) 自定義過濾器
阿新 • • 發佈:2019-01-10
filter 需要實現Filter 或者繼承FilterBase
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Filter{
//返回碼
public enum ReturnCode{
INCLUDE, //結果中包含著一樣
INCLUDE_AND_NEXT_COL, //包含著這樣一行,跳到下一行比較
SKIP, //跳到下一個keyvalue 並進行處理
NEXT_COL, //跳過當前一col
NEXT_ROW, //跳過當前一行
SEEK_NEXT_USING_HINT, //跳到下一個滿足地方,需要呼叫getNextKeyHint()
}
protected transient boolean reversed;
abstract public void reset() throws IOException;
//判斷行健是否滿足,不滿足可以跳過,避免其他檢查:比如字首過濾器
abstract public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException;
//這個過濾器可以提前結束
abstract public boolean filterAllRemaining() throws IOException;
- //對cell處理,
abstract public ReturnCode filterKeyValue(final Cell v) throws IOException;
- abstract public Cell transformCell(final Cell v) throws IOException;
@Deprecated// use Cell transformCell(final Cell)
abstract public KeyValue transform(final KeyValue currentKV) throws IOException;
//經過前面處理後,如果還有資料,將對當前行一起處理, 比如依賴過去器
abstract public void filterRowCells(List<Cell> kvs) throws IOException;
abstract public boolean hasFilterRow();
//經過這麼多流程如果還有資料,會去檢查一下資料的要求。比如pagefilter 是否已經夠一頁了
abstract public boolean filterRow() throws IOException;
@Deprecated
abstract public KeyValue getNextKeyHint(final KeyValue currentKV) throws IOException;
abstract public Cell getNextCellHint(final Cell currentKV) throws IOException;
abstract public boolean isFamilyEssential(byte[] name) throws IOException;
abstract public byte[] toByteArray() throws IOException;
public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException {
throw new DeserializationException(
"parseFrom called on base Filter, but should be called on derived type");
}
abstract boolean areSerializedFieldsEqual(Filter other);
public void setReversed(boolean reversed) {
this.reversed = reversed;
}
- public boolean isReversed() {
return this.reversed;
}
}
private boolean nextInternal(List<Cell> results,ScannerContext scannerContext)
throws IOException{
while(true){
- boolean stopRow = isStopRow(currentRow, offset, length);
boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
if (hasFilterRow) {
if (LOG.isTraceEnabled()) {
LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
+ " formed. Changing scope of limits that may create partials");
}
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
}
if (filterRowKey(currentRow, offset, length)) {
incrementCountOfRowsFilteredMetric(scannerContext);
// early check, see HBASE-16296
- //filterAllRemaining 實際呼叫
if (isFilterDoneInternal()) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
- incrementCountOfRowsScannedMetric(scannerContext);
- //裡面會呼叫filter.reset();
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
continue;
}
populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
- Cell nextKv = this.storeHeap.peek();
stopRow = nextKv == null ||
isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
final boolean isEmptyRow = results.isEmpty();
- FilterWrapper.FilterRowRetCode ret =FilterWrapper.FilterRowRetCode.NOT_CALLED;
if(hasFilterRow){
//會呼叫
filterRowCells(List) 和filterRowCells(cell)ret = filter.filterRowCellsWithRet(results);
- long timeProgress = scannerContext.getTimeProgress();
if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialTimeProgress);
} else {
scannerContext.clearProgress();
}
scannerContext.setTimeProgress(timeProgress);
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
}
}
if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
results.clear();
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
- if (!stopRow) continue;
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
- if (this.joinedHeap != null) {
boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
}
} else {
- populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
if (stopRow) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} else {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
}
}