1. 程式人生 > 其它 >Hbase 學習(三)Coprocessors

Hbase 學習(三)Coprocessors

Coprocessors 之前我們的filter都是在客戶端定義,然後傳到服務端去執行的,這個Coprocessors是在服務端定義,在客戶端呼叫,然後在服務端執行,他有點兒想我們熟悉的儲存過程,傳一些引數進去,然後進行我們事先定義好的操作,我們常常用它來做一些比如二次索引啊,統計函式什麼的,它也和自定義filter一樣,需要事先定好,然後在hbase-env.sh中的HBASE_CLASSPATH中指明,就像我的上一篇中的寫的那樣。 Coprocessors分兩種,observer和endpoint。 (1)observer就像觸發器一樣,當某個事件發生的時候,它就出發。 已經有一些內建的介面讓我們去實現,RegionObserver、MasterObserver、WALObserver,看名字就大概知道他們是幹嘛的。 (2)endpoint可以認為是自定義函式,可以把這個理解為關係資料庫的儲存過程。 所有的Coprocessor都是實現自Coprocessor 介面,它分SYSTEM和USER,前者的優先順序比後者的優先順序高,先執行。 它有兩個方法,start和stop方法,兩個方法都有一個相同的上下文物件CoprocessorEnvironment。

void start(CoprocessorEnvironment env) throws IOException; 
void stop(CoprocessorEnvironment env) throws IOException;

這是CoprocessorEnvironment的方法。

Working with Tables 對錶進行操作的時候,必須先呼叫getTable方法活得HTable,不可以自己定義一個HTable,目前貌似沒有禁止,但是將來會禁止。 並且在對錶操作的時候,不能對行加鎖。 Coprocessor Loading Coprocessor載入需要在配置檔案裡面全域性載入,比如在hbase-site.xml中設定。


 <property> 
    <name>hbase.coprocessor.region.classes
 </name>
      <value>coprocessor.RegionObserverExample,coprocessor.AnotherCoprocessor</value>
  </property>
    <property>   
        <name>hbase.coprocessor.master.classes</name>  
     <value>coprocessor.MasterObserverExample</value> 
        </property> 
        <property>  
        <name>hbase.coprocessor.wal.classes</name>  
        <value>coprocessor.WALObserverExample,bar.foo.MyWALObserver</value> </property>

我們自定義的時間可以註冊到三個配置項上,分別是hbase.coprocessor.region.classes,hbase.coprocessor.master.classes, hbase.coprocessor.wal.classes上,他們分別負責region,master,wal,註冊到region的要特別注意小心,因為它是針對所有表的。 <property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RegionObserverExample</value> </property> 註冊到這三個觸發器上,可以監控到幾乎所有我們的操作上面,非常恐怖。。可以說是想要什麼就有什麼,詳細的程式碼大家自己去摸索。 EndPoint的可以用來定義聚合函式,我們可以呼叫CoprocessorProtocol中的方法來實現我們的需求。 呼叫coprocessorProxy() 傳一個單獨的row key,這是在單獨一個region上操作的。 要在所有region上面操作,我們要呼叫coprocessorExec()方法 傳一個開始row key 和結束row key。

Demo 說了那麼多廢話,我都不好意思再說了,來個例子吧,統計行數的。

public interface RowCountProtocol extends CoprocessorProtocol { long getRowCount() throws IOException; long getRowCount(Filter filter) throws IOException; long getKeyValueCount() throws IOException; } public class RowCountEndpoint extends BaseEndpointCoprocessor implements RowCountProtocol { private long getCount(Filter filter, boolean countKeyValues) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(1); if (filter != null) { scan.setFilter(filter); } RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment(); // use an internal scanner to perform scanning. InternalScanner scanner = environment.getRegion().getScanner(scan); int result = 0; try { List<KeyValue> curVals = new ArrayList<KeyValue>(); boolean done = false; do { curVals.clear(); done = scanner.next(curVals); result += countKeyValues ? curVals.size() : 1; } while (done); } finally { scanner.close(); } return result; } @Override public long getRowCount() throws IOException { return getRowCount(new FirstKeyOnlyFilter()); } @Override public long getRowCount(Filter filter) throws IOException { return getCount(filter, false); } @Override public long getKeyValueCount() throws IOException { return getCount(null, true); } }

寫完之後,註冊一下吧。 <property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RowCountEndpoint</value> </property>

JAVA 客戶端呼叫 在服務端定義之後,我們怎麼在客戶端用java程式碼呼叫呢,看下面的例子你就明白啦!

public class EndPointExample { public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "testtable"); try { Map<byte[], Long> results = table.coprocessorExec( RowCountProtocol.class, null, null, new Batch.Call<RowCountProtocol, Long>() { @Override public Long call(RowCountProtocol counter) throws IOException { return counter.getRowCount(); } }); long total = 0; for (Map.Entry<byte[], Long> entry : results.entrySet()) { total += entry.getValue().longValue(); System.out.println("Region: " + Bytes.toString(entry.getKey()) + ", Count: " + entry.getValue()); } System.out.println("Total Count: " + total); } catch (Throwable throwable) { throwable.printStackTrace(); } } }

通過table的coprocessorExec方法呼叫,然後呼叫RowCountProtocol介面的getRowCount()方法。 然後遍歷每個Region返回的結果,合起來就是最終的結果,列印結果如下。 Region: testtable,,1303417572005.51f9e2251c29ccb2...cbcb0c66858f., Count: 2 Region: testtable,row3,1303417572005.7f3df4dcba3f...dbc99fce5d87., Count: 3 Total Count: 5

在上面的例子當中,我們是用Batch.Call()方法來呼叫介面當中的方法,我們可以用另外一個方法來簡化上述程式碼,來看例子。 Batch.Call call =Batch.forMethod(RowCountProtocol.class,"getKeyValueCount"); Map<byte[], Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call);

採用Batch.Call方法呼叫同時呼叫多個方法

Map<byte[], Pair<Long, Long>> results =table.coprocessorExec( RowCountProtocol.class, null, null, new Batch.Call<RowCountProtocol, Pair<Long, Long>>() { public Pair<Long, Long> call(RowCountProtocol counter) throws IOException { return new Pair(counter.getRowCount(),counter.getKeyValueCount()); } }); long totalRows = 0; long totalKeyValues = 0; for (Map.Entry<byte[], Pair<Long, Long>> entry :results.entrySet()) { totalRows += entry.getValue().getFirst().longValue(); totalKeyValues +=entry.getValue().getSecond().longValue(); System.out.println("Region: " +Bytes.toString(entry.getKey()) +", Count: " + entry.getValue()); } System.out.println("Total Row Count: " + totalRows); System.out.println("Total KeyValue Count: " +totalKeyValues);

呼叫coprocessorProxy()在單個region上執行 RowCountProtocol protocol = table.coprocessorProxy(RowCountProtocol.class, Bytes.toBytes("row4")); long rowsInRegion = protocol.getRowCount(); System.out.println("Region Row Count: " +rowsInRegion); 上面這個例子是查詢row4行所在region的資料條數,這個可以幫助我們統計每個region上面的資料分佈。