HBase 協處理器 (二)
3.7 The RegionObserver Class
用於 region level, 注意,這個類的所有呼叫方法的第一個引數為 ObserverContext<RegionCoprocessorEnvironment> ctx8, 提供訪問 context 例項。
操作可以被劃分為兩組: region 生命週期變化和客戶端 API 呼叫。兩種型別都有通用的回撥操作:
enum Operation {
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION,
MERGE_REGION, BATCH_MUTATE, REPLAY_BATCH_MUTATE, COMPACT_REGION
}
postStartRegionOperation(..., Operation operation)
postCloseRegionOperation(..., Operation operation)
RegionObserver 中的這些方法在任何可能的 Operation 發生時被呼叫。
3.7.1 處理 region 生命週期事件 (Handling Region Life-Cycle Events)
-----------------------------------------------------------------------------------------------------------------------------------------
observer 可以鉤入到 pending open, open, 和 pending close 狀態變化。每一個鉤子被框架隱式呼叫。
● State: pending open
-------------------------------------------------------------------------------------------------------------------------------------
region 要被開啟時,處於這個狀態。監聽的協處理器可以搭載這個過程,或者使這個過程失效。為此,下列回撥按次序呼叫:
postLogReplay(...)
preOpen(...)
preStoreFileReaderOpen(...)
postStoreFileReaderOpen(...)
preWALRestore(...) / postWALRestore(...)
postOpen(...)
這些方法只在 region 開啟之前被呼叫,在儲存檔案開啟之前和之後呼叫,WAL 準備重放時呼叫,以及在開啟 region 之後呼叫。使用者自己的協處理器
實現可以利用這些回撥,例如,指示框架在 preOpen() 中,終止開啟過程。或者鉤入到 postOpen() 呼叫來觸發一個快取準備操作。
第一個事件,postLogReplay() 的觸發取決於 WAL 恢復模式(WAL recovery mode)的配置:分散式日誌切分(distributed log splitting),或者日誌重
放(log replay),由 hbase.master.distributed.log.replay 屬性配置。前者運行於一個 region 開啟之前,因此會首先觸發這個呼叫。後者開啟一個
region ,然後重放編輯日誌,在 region 開啟事件之後,觸發該回調。
在這兩種模式中,但取決於哪一種模式是活動的, region server 從 write-ahead-log (WAL) 應用記錄。因此按次序呼叫監聽者的 preWALRestore()
或 postWALRestore() 方法。在使用分散式日誌切分模式下(distributed log splitting), 這會發生在 pending open 之後,但在 open state 之前。
否則,該呼叫在 open 事件之後,因為日誌被重放。鉤入到 WAL 呼叫可以控制在日誌重放過程中應用了什麼資料操作(mutation).可以訪問編輯日誌(
edit record), 可以用來觀察應用了什麼操作。
● State: open
-------------------------------------------------------------------------------------------------------------------------------------
當一個 region 被部署到一個 region 伺服器上並且可以正常工作時,這個 region 被認為處於 open 狀態。例如,region 的記憶體儲存可以刷寫到磁碟
上,或者在 region 變得太大時可以被拆分。可能的鉤子如下:
preFlushScannerOpen(...)
preFlush(...) / postFlush(...)
preCompactSelection(...) / postCompactSelection(...)
preCompactScannerOpen(...)
preCompact(...) / postCompact(...)
preSplit(...)
preSplitBeforePONR(...)
preSplitAfterPONR(...)
postSplit(...)
postCompleteSplit(...) / preRollBackSplit(...) / postRollBackSplit(...)
這些方法名稱上看都非常直接:pre 呼叫在各自的操作之前呼叫,而 post 呼叫在操作之後呼叫。例如,利用 preSplit() 鉤子,可以有效地禁止內建
的 region 拆分過程,並手動執行這些操作。有些呼叫只有 pre 鉤子,有些則只有 post 鉤子。
flush, compact, 和 split 這三種類型的鉤子直接連線到匹配 region 內部處理函式。也有一些更特殊的鉤子,作為這三種函式的部分呼叫。例如,
preFlushScannerOpen() 鉤子在 memstore 設定的時候被呼叫,這隻在刷寫實際發生之前呼叫。
類似地,對於壓縮(compaction),首先伺服器選擇包含的檔案,該功能被封裝到協處理器回撥中。之後儲存掃描器開啟,最後,實際的壓縮發生。
對於 split, 有幾個回撥反應當前的狀態,通過一個狀態之間特定的 point-of-no-return (PONR) 概念。這發生在 split process 開始之後,但在最後
的動作發生之前。拆分的處理類似於一個內部的事務,當事務要提交時,preSplitBeforePONR() 被呼叫,而之後 preSplitAfterPONR() 被呼叫。也有
對最終的 completed 或 rollback 的呼叫,通知使用者 split 事務的結果。
● State: pending close
-------------------------------------------------------------------------------------------------------------------------------------
對於觀察者的最後一組鉤子是在 region 在進入 pending close 狀態時呼叫的。這組呼叫在 region 從 open 變為 close 時發生。只是在 region 關閉
之前和之後,執行如下鉤子:
preClose(..., boolean abortRequested)
postClose(..., boolean abortRequested)
abortRequested 引數指出一個 region 關閉的原因。通常情況下,region 正常操作關閉,如,由於負載均衡的原因,該 region 移動到一個不同的region
server 上。但也有可能由於某個 region server 出了問題而終止這個 region 以避免影響任何一端的處理結果。當這類情況發生時,所有儲存在該伺服器
上的 region 都會終止,使用者可以通過這個給定的引數判斷髮生了什麼情況。
除此之外,這個類也繼承了 start() 和 stop() 方法,允許分配、釋放生命週期資源。
3.7.2 處理客戶端 API 事件 (Handling Client API Events)
-----------------------------------------------------------------------------------------------------------------------------------------
與生命週期事件相對,所有客戶端 API 呼叫顯式地從客戶端應用傳送到 region 伺服器。使用者可以鉤入到這些呼叫,在應用之前或之後回撥。下表列出可用
的 API 呼叫和相關的回撥方法:
Callbacks for client API functions
+-------------------------------+-------------------------------------------+--------------------------------------------
| API Call | Pre-Hook | Post-Hook
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.put() | prePut(...) | postPut(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.checkAndPut() | preCheckAndPut(...) | postPut(...)
| | preCheckAndPutAfterRowLock(...) | postCheckAndPut(...)
| | prePut(...) |
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.get() | preGetOp(...) | postGetOp(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.delete() | preDelete(...) | postDelete(...)
| Table.batch() | prePrepareTimeStampForDeleteVersion(...) |
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.checkAndDelete() | preCheckAndDelete(...) | postDelete(...)
| | preCheckAndDeleteAfterRowLock(...) | postCheckAndDelete(...)
| | preDelete(...) |
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.mutateRow() | preBatchMutate(...) | postBatchMutate(...)
| | prePut(...)/preGetOp(...) | postPut(...)/postGetOp(...)
| | | postBatchMutateIndispensably()
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.append() | preAppend(...) | postMutationBeforeWAL(...)
| | preAppendAfterRowLock() | postAppend(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.batch() | preBatchMutate(...) | postPut(...)/postGetOp(...)
| | prePut(...)/preGetOp(...)/preDelete(...) | postBatchMutate(...)
| | prePrepareTimeStampForDeleteVersion(...) |
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.checkAndMutate() | preBatchMutate(...) | postBatchMutate(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.getScanner() | preScannerOpen(...) | postInstantiateDeleteTracker(...)
| | preStoreScannerOpen(...) | postScannerOpen(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| ResultScanner.next() | preScannerNext(...) | postScannerFilterRow(...)
| | | postScannerNext(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| ResultScanner.close() | preScannerClose(...) | postScannerClose(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.increment(), | preIncrement(...) | postMutationBeforeWAL(...)
| Table.batch() | preIncrementAfterRowLock(...) | postIncrement(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.incrementColumnValue() | preIncrementColumnValue(...) | postIncrementColumnValue(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.getClosestRowBefore() | preGetClosestRowBefore(...) | postGetClosestRowBefore(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.exists() | preExists(...) | postExists(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| completebulkload (tool) | preBulkLoadHFile(...) | postBulkLoadHFile(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
上表中列出的事件是有順序的,如果有多個事件發生,發生的順序從上至下。斜線 "/" 表示可能發生的事件為多個事件中的一個,具體會是哪個事件,
取決於呼叫中具體包含的操作。各個事件回撥方法的詳細資訊參考 API 文件或原始碼。
示例: Observer collecting invocation statistics
@SuppressWarnings("deprecation") // because of API usage
public class ObserverStatisticsEndpoint
extends ObserverStatisticsProtos.ObserverStatisticsService
implements Coprocessor, CoprocessorService, RegionObserver {
private RegionCoprocessorEnvironment env;
private Map<String, Integer> stats = new LinkedHashMap<>();
// Lifecycle methods
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
...
// Endpoint methods
@Override
public void getStatistics(RpcController controller,
ObserverStatisticsProtos.StatisticsRequest request,
RpcCallback<ObserverStatisticsProtos.StatisticsResponse> done) {
ObserverStatisticsProtos.StatisticsResponse response = null;
try {
ObserverStatisticsProtos.StatisticsResponse.Builder builder =
ObserverStatisticsProtos.StatisticsResponse.newBuilder();
ObserverStatisticsProtos.NameInt32Pair.Builder pair =
ObserverStatisticsProtos.NameInt32Pair.newBuilder();
for (Map.Entry<String, Integer> entry : stats.entrySet()) {
pair.setName(entry.getKey());
pair.setValue(entry.getValue().intValue());
builder.addAttribute(pair.build());
}
response = builder.build();
// optionally clear out stats
if (request.hasClear() && request.getClear()) {
synchronized (stats) {
stats.clear();
}
}
} catch (Exception e) {
ResponseConverter.setControllerException(controller, new IOException(e));
}
done.run(response);
}
/**
* Internal helper to keep track of call counts.
*
* @param call The name of the call.
*/
private void addCallCount(String call) {
synchronized (stats) {
Integer count = stats.get(call);
if (count == null) count = new Integer(1);
else count = new Integer(count + 1);
stats.put(call, count);
}
}
// All Observer callbacks follow here
@Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
addCallCount("preOpen");
}
@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext) {
addCallCount("postOpen");
}
...
}
3.7.3 The RegionCoprocessorEnvironment Class
-----------------------------------------------------------------------------------------------------------------------------------------
提供給實現 RegionObserver 介面的協處理器的環境例項基於 RegionCoprocessorEnvironment 類,該類實現 CoprocessorEnvironment 介面。除了已提供的
方法,RegionCoprocessorEnvironment 還提供了一些特殊的,面向 region 子類的方法,如下表所示:
Specific methods provided by the RegionCoprocessor Environment class
+---------------------------+---------------------------------------------------------------------------------------+
| Method | Description |
+---------------------------+---------------------------------------------------------------------------------------+
| getRegion() | Returns a reference to the region the current observer is associated with. |
+---------------------------+---------------------------------------------------------------------------------------+
| getRegionInfo() | Get information about the region associated with the current coprocessor instance. |
+---------------------------+---------------------------------------------------------------------------------------+
| getRegionServerServices() | Provides access to the shared RegionServerServices instance. |
+---------------------------+---------------------------------------------------------------------------------------+
| getSharedData() | All the shared data between the instances of this coprocessor |
+---------------------------+---------------------------------------------------------------------------------------+
getRegion() 呼叫可用於獲取存本機上的 HRegion 例項的引用,並可以呼叫 HRegion 類提供的方法。如果需要有關該 region 的資訊,可以呼叫
getRegionInfo() 來獲取 HRegionInfo 例項。此類有非常有用的方法可以獲取所包含的 key 的訪問,region 的名稱,以及狀態標誌:
byte[] getStartKey()
byte[] getEndKey()
byte[] getRegionName()
boolean isSystemTable()
int getReplicaId()
...
另外,使用者程式碼可以訪問共享的 region server 服務例項,通過呼叫 getRegionServerServices() 方法並返回 RegionServerServices 例項。該類提供了
很多高階方法,如下表所示:
Methods provided by the RegionServerServices class
+-------------------------------+-----------------------------------------------------------------------------------------
| Method | Description
+-------------------------------+-----------------------------------------------------------------------------------------
| abort() | Allows aborting the entire server process, shutting down the instance with the given reason.
+-------------------------------+-----------------------------------------------------------------------------------------
| addToOnlineRegions() | Adds a given region to the list of online regions. This is used for internal bookkeeping.
+-------------------------------+-----------------------------------------------------------------------------------------
| getCompactionRequester() | Provides access to the shared CompactionRequestor instance. This can be used to initiate
| | compactions from within the coprocessor.
+-------------------------------+-----------------------------------------------------------------------------------------
| getConfiguration() | Returns the current server configuration.
+-------------------------------+-----------------------------------------------------------------------------------------
| getConnection() | Provides access to the shared connection instance.
+-------------------------------+-----------------------------------------------------------------------------------------
| getCoordinatedStateManager() | Access to the shared state manager, gives access to the TableStateManager, which in turn can
| | be used to check on the state of a table
+-------------------------------+-----------------------------------------------------------------------------------------
| getExecutorService() | Used by the master to schedule system-wide events.
+-------------------------------+-----------------------------------------------------------------------------------------
| getFileSystem() | Used by the master to schedule system-wide events.
+-------------------------------+-----------------------------------------------------------------------------------------
| getFlushRequester() | Provides access to the shared FlushRequester instance. This can be used to initiate memstore flushes.
+-------------------------------+-----------------------------------------------------------------------------------------
| getFromOnlineRegions() | Returns a HRegion instance for a given region, must be hosted by same server
+-------------------------------+-----------------------------------------------------------------------------------------
| getHeapMemoryManager() | Provides access to a manager instance, gives access to heap related information, such as occupancy.
+-------------------------------+-----------------------------------------------------------------------------------------
| getLeases() | Returns the list of leases, as acquired for example by client side scanners.
+-------------------------------+-----------------------------------------------------------------------------------------
| getMetaTableLocator() | The method returns a class providing system table related functionality.
+-------------------------------+-----------------------------------------------------------------------------------------
| getNonceManager() | Gives access to the nonce manager, which is used to generate unique IDs.
+-------------------------------+-----------------------------------------------------------------------------------------
| getOnlineRegions() | Lists all online regions on the current server for a given table.
+-------------------------------+-----------------------------------------------------------------------------------------
| getRecoveringRegions() | Lists all regions that are currently in the process of replaying WAL entries.
+-------------------------------+-----------------------------------------------------------------------------------------
| getRegionServerAccounting() | Provides access to the shared RegionServerAccounting instance. It allows you to check on what
| | the server currently has allocated—for example, the global memstore size.
+-------------------------------+-----------------------------------------------------------------------------------------
| getRegionsInTransitionInRS() | List of regions that are currently in-transition.
+-------------------------------+-----------------------------------------------------------------------------------------
| getRpcServer() | Returns a reference to the low-level RPC implementation instance
+-------------------------------+-----------------------------------------------------------------------------------------
| getServerName() | The server name, which is unique for every region server process.
+-------------------------------+-----------------------------------------------------------------------------------------
| getTableLockManager() | Gives access to the lock manager. Can be used to acquire read and write locks for the entire table.
+-------------------------------+-----------------------------------------------------------------------------------------
| getWAL() | Provides access to the write-ahead log instance.
+-------------------------------+-----------------------------------------------------------------------------------------
| getZooKeeper() | Returns a reference to the ZooKeeper watcher instance.
+-------------------------------+-----------------------------------------------------------------------------------------
| isAborted() | Flag is true when abort() was called previously
+-------------------------------+-----------------------------------------------------------------------------------------
| isStopped() | Returns true when stop() (inherited from Stoppable) was called beforehand
+-------------------------------+-----------------------------------------------------------------------------------------
| isStopping() | Returns true when the region server is stopping.
+-------------------------------+-----------------------------------------------------------------------------------------
| postOpenDeployTasks() | Called by the region server after opening a region, does internal housekeeping work.
+-------------------------------+-----------------------------------------------------------------------------------------
| registerService() | Registers a new custom service. Called when server starts and coprocessors are loaded.
+-------------------------------+-----------------------------------------------------------------------------------------
| removeFromOnlineRegions() | Removes a given region from the internal list of online regions.
+-------------------------------+-----------------------------------------------------------------------------------------
| reportRegionStateTransition() | Triggers a report chain when a state change is needed for a region. Sent to the Master
+-------------------------------+-----------------------------------------------------------------------------------------
| stop() | Stops the server gracefully
+-------------------------------+-----------------------------------------------------------------------------------------
沒有必要自己實現 RegionObserver 介面,基於這個介面,可以利用 BaseRegionObserver 類來只實現所需要回調方法。
3.7.4 The BaseRegionObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
這個類可用作觀察者型別協處理器的基類。它有 RegionObserver 介面要求的所有方法的空實現,因此預設什麼有不做。必須重寫所有感興趣的回撥來新增
自己的功能。
示例: Example region observer checking for special get requests
public static final byte[] FIXED_ROW = Bytes.toBytes("@@@
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
//Check if the request row key matches a well known one.
if (Bytes.equals(get.getRow(), FIXED_ROW)) {
Put put = new Put(get.getRow());
put.addColumn(FIXED_ROW, FIXED_ROW,
Bytes.toBytes(System.currentTimeMillis()));
CellScanner scanner = put.cellScanner();
scanner.advance();
Cell cell = scanner.current();
results.add(cell);
}
}
}
NOTE
-------------------------------------------------------------------------------------------------------------------------------------
將如下屬性新增到 hbase-site.xml 配置檔案以啟用這個協處理器
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>coprocessor.RegionObserverExample</value>
</property>
3.8 The MasterObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
討論 Coprocessor 的第二個子類是為了處理 master server 啟動的所有回撥方法。它們被劃歸為資料定義操作(data-manipulation operations), 類似於
關係資料庫系統中的 DDL。MasterObserver 類提供如下鉤子:
Callbacks for master API functions
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| API Call | Shell Call | Pre-Hook | Post-Hook
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| createTable() | create | preCreateTable(...) | postCreateTable(...)
| | | preCreateTableHandler(...) |
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| deleteTable() | drop | preDeleteTable(...) | postDeleteTableHandler(...),
| deleteTables() | | preDeleteTableHandler(...) | postDeleteTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyTable() | alter | preModifyTable(...) | postModifyTableHandler(...),
| | | preModifyTableHandler(...) | postModifyTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyTable() | alter | preAddColumn(...) | postAddColumnHandler(...),
| | | preAddColumnHandler(...) | postAddColumn(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyTable() | alter | preDeleteColumn(...) | postDeleteColumnHandler(...),
| | | preDeleteColumnHandler(...) | postDeleteColumn(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyTable() | alter | preModifyColumn(...) | postModifyColumnHandler(...),
| | | preModifyColumnHandler(...) | postModifyColumn(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| enableTable() | enable | preEnableTable(...) | postEnableTableHandler(...),
| enableTables() | | preEnableTableHandler(...) | postEnableTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| disableTable() | disable | preDisableTable(...), | postDisableTableHandler(...),
| disableTables() | | preDisableTableHandler(...) | postDisableTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| flush() | flush | preTableFlush(...) | postTableFlush(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| truncateTable() | truncate | preTruncateTable(...) | postTruncateTableHandler(...),
| | | preTruncateTableHandler(...) | postTruncateTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| move() | move | preMove(...) | postMove(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| assign() | assign | preAssign(...) | postAssign(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| unassign() | unassign | preUnassign(...) | postUnassign(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| offline() | n/a | preRegionOffline(...) | postRegionOffline(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| balancer() | balancer | preBalance(...) | postBalance(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| setBalancerRunning() | balance_switch | preBalanceSwitch(...) | postBalanceSwitch(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| listTableNames() | list | preGetTableNames(...) | postGetTableNames(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| getTableDescriptors() | list | preGetTableDescriptors(...) | postGetTableDescriptors(...)
| listTables() | | |
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| createNamespace() | create_namespace | preCreateNamespace(...) | postCreateNamespace(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| deleteNamespace() | drop_namespace | preDeleteNamespace(...) | postDeleteNamespace(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| getNamespaceDescriptor() | describe_namespace | preGetNamespaceDescriptor(...) | postGetNamespaceDescriptor(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| listNamespaceDescriptors() | list_namespace | preListNamespaceDescriptors(...) | postListNamespaceDescriptors(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyNamespace() | alter_namespace | preModifyNamespace(...) | postModifyNamespace(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| cloneSnapshot() | clone_snapshot | preCloneSnapshot(...) | postCloneSnapshot(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| deleteSnapshot(), | delete_snapshot | preDeleteSnapshot(...) | postDeleteSnapshot(...)
| deleteSnapshots() | delete_all_snapshot | |
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| restoreSnapshot() | restore_snapshot | preRestoreSnapshot(...) | postRestoreSnapshot(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| snapshot() | snapshot | preSnapshot(...) | postSnapshot(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| shutdown() | n/a | preShutdown(...) | n/a
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| stopMaster() | n/a | preStopMaster(...) | n/a
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| n/a | n/a | preMasterInitialization(...) | postStartMaster(...)
+-------------------------------+-----------------------+-----------------------------------+------------------------------
3.8.1 The MasterCoprocessorEnvironment Class
-----------------------------------------------------------------------------------------------------------------------------------------
類似於 RegionCoprocessorEnvironment, 與 RegionObserver 協處理器緊緊關聯,MasterCoprocessorEnvironment 封裝到 MasterObserver 例項中。它也
實現 CoprocessorEnvironment 介面,如此,例如可以在自己的實現中通過 getTable() 呼叫訪問資料。除了 CoprocessorEnvironment 介面提供的方法,也
提供了一些特殊的,面向 master 子類的方法,如下表所示:
Specific method provided by the MasterCoprocessorEnvironment class
+---------------------------+---------------------------------------------------------------------------------------+
| Method | Description |
+---------------------------+---------------------------------------------------------------------------------------+
| getMasterServices() | Provides access to the shared MasterServices instance |
+---------------------------+---------------------------------------------------------------------------------------+
使用者程式碼中可以訪問共享的 master services 例項,該服務類公開了很多 Master admin API, 大致分類如下:
table 相關的方法:
createTable(HTableDescriptor, byte[][])
deleteTable(TableName)
modifyTable(TableName, HTableDescriptor)
enableTable(TableName)
disableTable(TableName)
getTableDescriptors()
truncateTable(TableName, boolean)
addColumn(TableName, HColumnDescriptor)
deleteColumn(TableName, byte[])
modifyColumn(TableName, HColumnDescriptor)
namespace 相關的方法:
createNamespace(NamespaceDescriptor)
deleteNamespace(String)
modifyNamespace(NamespaceDescriptor)
getNamespaceDescriptor(String)
listNamespaceDescriptors()
listTableDescriptorsByNamespace(String)
listTableNamesByNamespace(String)
下表列出更特定的方法及其簡短描述:
Methods provided by the MasterServices class
+-----------------------------------+-------------------------------------------------------------------------------------
| Method | Description
+-----------------------------------+-------------------------------------------------------------------------------------
| abort() | Allows aborting the entire server process, shutting down the instance with the given reason.
+-----------------------------------+-------------------------------------------------------------------------------------
| checkTableModifiable() | Convenient to check if a table exists and is offline so that it can be altered.
+-----------------------------------+-------------------------------------------------------------------------------------
| dispatchMergingRegions() | Flags two regions to be merged, which is performed on the region servers.
+-----------------------------------+-------------------------------------------------------------------------------------
| getAssignmentManager() | Gives you access to the assignment manager instance. It is responsible for all region
| | assignment operations, such as assign, unassign, balance, and so on
+-----------------------------------+-------------------------------------------------------------------------------------
| getConfiguration() | Returns the current server configuration.
+-----------------------------------+-------------------------------------------------------------------------------------
| getConnection() | Provides access to the shared connection instance.
+-----------------------------------+-------------------------------------------------------------------------------------
| getCoordinatedStateManager() | Access to the shared state manager, gives access to the TableStateManager, which in turn
| | can be used to check on the state of a table
+-----------------------------------+-------------------------------------------------------------------------------------
| getExecutorService() | Used by the master to schedule system-wide events
+-----------------------------------+-------------------------------------------------------------------------------------
| getMasterCoprocessorHost() | Returns the enclosing host instance.
+-----------------------------------+-------------------------------------------------------------------------------------
| getMasterFileSystem() | Provides you with an abstraction layer for all filesystem-related operations the master is
| | involved in—for example, creating directories for table files and logfiles.
+-----------------------------------+-------------------------------------------------------------------------------------
| getMetaTableLocator() | The method returns a class providing system table related functionality.
+-----------------------------------+-------------------------------------------------------------------------------------
| getServerManager() | Returns the server manager instance. With it you have access to the list of servers, live or
| | considered dead, and more
+-----------------------------------+-------------------------------------------------------------------------------------
| getServerName() | The server name, which is unique for every region server process
+-----------------------------------+-------------------------------------------------------------------------------------
| getTableLockManager() | Gives access to the lock manager. Can be used to acquire read and write locks for the entire table.
+-----------------------------------+-------------------------------------------------------------------------------------
| getZooKeeper() | Returns a reference to the ZooKeeper watcher instance
+-----------------------------------+-------------------------------------------------------------------------------------
| isAborted() | Flag is true when abort() was called previously
+-----------------------------------+-------------------------------------------------------------------------------------
| isInitialized() | After the server process is operational, this call will return true.
+-----------------------------------+-------------------------------------------------------------------------------------
| isServerShutdownHandlerEnabled() | When an optional shutdown handler was set, this check returns true.
+-----------------------------------+-------------------------------------------------------------------------------------
| isStopped() | Returns true when stop() (inherited from Stoppable) was called beforehand.
+-----------------------------------+-------------------------------------------------------------------------------------
| registerService() | Registers a new custom service. Called when server starts and coprocessors are loaded
+-----------------------------------+-------------------------------------------------------------------------------------
| stop() | Stops the server gracefully
+-----------------------------------+-------------------------------------------------------------------------------------
3.8.2 The BaseMasterObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
使用者可以憑自己的能力直接實現 MasterObserver 介面,或繼承 BaseMasterObserver 類來實現。BaseMasterObserver 實現了 MasterObserver 介面,只是
所有的回撥都為空實現。如果直接使用這個類,不會有任何實際的互動反應。
通過重寫適當的事件回撥方法,可以新增其它功能。可以將自己的程式碼鉤入到 pre 或 post 回撥方法中。下面示例利用 post 鉤子在一個 table 建立之後
執行額外的任務。
示例: Example master observer that creates a separate directory on the file system when a table is created.
public class MasterObserverExample extends BaseMasterObserver {
@Override
public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, HTableDescriptor desc, HRegionInfo[] regions)
throws IOException {
TableName tableName = desc.getTableName();
MasterServices services = ctx.getEnvironment().getMasterServices();
MasterFileSystem masterFileSystem = services.getMasterFileSystem();
FileSystem fileSystem = masterFileSystem.getFileSystem();
Path blobPath = new Path(tableName.getQualifierAsString() + "-
blobs");
fileSystem.mkdirs(blobPath);
}
}
NOTE
-------------------------------------------------------------------------------------------------------------------------------------
將如下屬性新增到 hbase-site.xml 配置檔案以在 master 程序啟動時載入這個協處理器
<property>
<name>hbase.coprocessor.master.classes</name>
<value>coprocessor.MasterObserverExample</value>
</property
修改後,重啟 HBase 以使配置生效。
一旦激活了這個協處理器,它會在關注的事件上監聽,並自動觸發使用者程式碼的執行。
可以使用如下 shell 命令觸發事件:
hbase(main):001:0> create 'testtable3', 'colfam1'
0 row(s) in 0.6740 seconds
建立了一個表,隨後會呼叫協處理器的 postCreateTable() 方法,通過 Hadoop 命令列工具驗證結果:
$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x - larsgeorge supergroup 0 ... testtable3-
blobs
3.8.3 The BaseMasterAndRegionObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
HBase 還提供了另外一個相關的基類,BaseMasterAndRegionObserver. 它聯合了 BaseRegionObserver 類 和 MasterObserver 介面。
public abstract class BaseMasterAndRegionObserver
extends BaseRegionObserver implements MasterObserver {
...
}
在效果上,它像是組合了 BaseRegionObserver 和 BaseRegionObserver 為一個類。由於這個類提供了一個 region server 和 master server 實現,因此
只用於執行在 HBase Master 上。這對將系統表直接放在 master 上有用。
3.9 The RegionServerObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
這個類是 region 伺服器級別的,它公開了與伺服器功能相關聯的定義良好的鉤子方法,也就是說,它們是跨多個 region 和 table 的。鉤子方法如下:
● postCreateReplicationEndPoint(...)
-------------------------------------------------------------------------------------------------------------------------------------
在伺服器建立了一個 replication endpoint (不要與 coprocessor endpoint 混淆)之後呼叫。
● preMerge(...), postMerge(...)
-------------------------------------------------------------------------------------------------------------------------------------
兩個 region 合併時呼叫
● preMergeCommit(...), postMergeCommit(...)
-------------------------------------------------------------------------------------------------------------------------------------
與上面的相同,但更小的作用域。在 preMerge() 之後和 postMerge() 之前呼叫。
● preRollBackMerge(...), postRollBackMerge(...)
-------------------------------------------------------------------------------------------------------------------------------------
這兩個方法在一個 region 合併失敗,並且合併事務回滾時被呼叫。
● preReplicateLogEntries(...), postReplicateLogEntries(...)
-------------------------------------------------------------------------------------------------------------------------------------
關聯到 WAL 項重放過程,可以對每一個日誌項進行特殊的處理。
● preRollWALWriterRequest(...), postRollWALWriterRequest(...)
-------------------------------------------------------------------------------------------------------------------------------------
關聯到 WAL 檔案回滾,基於檔案大小,時間,或手工請求時呼叫
● preStopRegionServer(...)
-------------------------------------------------------------------------------------------------------------------------------------
這是個 pre-only 的鉤子,當繼承自 Stoppable 介面的 stop() 方法呼叫時被呼叫。
3.9.1 The RegionServerCoprocessorEnvironment Class
-----------------------------------------------------------------------------------------------------------------------------------------
也實現 CoprocessorEnvironment 介面,因此可以在自己的實現中訪問 getTable() 呼叫以訪問資料。除 CoprocessorEnvironment 提供的方法,也提供了
面向 server 的方法:
Specific method provided by the RegionServerCoprocessorEnvironment class
+-----------------------------------+-------------------------------------------------------------------------------------
| Method | Description
+-----------------------------------+-------------------------------------------------------------------------------------
| getRegionServerServices() | Provides access to the shared RegionServerServices instance.
+-----------------------------------+-------------------------------------------------------------------------------------
3.9.2 The BaseRegionServerObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
RegionServerObserver 介面的空實現,繼承該類,可以將精力集中在真正需要處理的問題上,只重寫必要的方法,簡化實現。
3.10 The WALObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
與 write-ahead log(WAL for short) 相關的觀察者類。它提供了易於處理的回撥,如下:
preWALWrite(...), postWALWrite(...)
封裝了正在寫入到 WAL 的 log entry, 允許訪問全部編輯記錄。因為可以在這兩個方法上接收到整個日誌記錄,因此使用者程式碼可以影響寫入日誌的內容。
例如,在一些高階應用場景,可以向日志編輯記錄新增額外的 cell, 這樣在日誌重放時,這個額外的 cell 可以幫助微調資料重建過程。
3.10.1 The WALCoprocessorEnvironment Class
-----------------------------------------------------------------------------------------------------------------------------------------
作為回撥的一部分提供的環境,這裡是 WALCoprocessorEnvironment 例項,它也擴充套件自 CoprocessorEnvironment,因此也可以在自己的實現程式碼中通過
getTable() 方法訪問資料。
Specific method provided by the WALCoprocessorEnvironment class
+-----------------------------------+-------------------------------------------------------------------------------------
| Method | Description
+-----------------------------------+-------------------------------------------------------------------------------------
| getWAL() | Provides access to the shared WAL instance.
+-----------------------------------+-------------------------------------------------------------------------------------
利用 WAL 的引用,可以滾動當前的寫入器,換句話說,關閉當前的日誌檔案,並建立一個新的日誌檔案。也可以呼叫 sync() 方法來強制編輯
日誌寫入到持久層。下面是 WAL 介面提供的方法:
void registerWALActionsListener(final WALActionsListener listener)
boolean unregisterWALActionsListener(final WALActionsListener listener)
byte[][] rollWriter() throws FailedLogCloseException, IOException
byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException
void shutdown() throws IOException
void close() throws IOException
long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs)throws IOException
void sync() throws IOException
void sync(long txid) throws IOException
boolean startCacheFlush(final byte[] encodedRegionName)
void completeCacheFlush(final byte[] encodedRegionName)
void abortCacheFlush(byte[] encodedRegionName)
WALCoprocessorHost getCoprocessorHost()
long getEarliestMemstoreSeqNum(byte[] encodedRegionName)
這些都是非常底層的方法,詳細資訊參考 API 文件
3.10.2 The BaseWALObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
BaseWALObserver 類實現 WALObserver 介面,使用者可以繼承這個類,或者直接實現介面。
3.11 The BulkLoadObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
這個觀察者類用於在 bulk loading 操作期間,由 HBase 提供的 completebulkload 工具觸發。包含在伺服器 JAR 檔案中。使用 Hadoop JAR 支援,可以
檢視工具列表:
bin/hadoop jar /usr/local/hbase-1.0.0-bin/lib/hbaseserver-1.0.0.jar
An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in HBase table
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
verifyrep: Compare the data from tables in two different clusters.
WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
一旦 completebulkload 工具執行,它會嘗試將所有分階段的 bulk 載入。在此操作期間,觸發可用