Hbase 過濾器詳解及一些程式碼測試示例
一、過濾器(Filter)
基礎API中的查詢操作在面對大量資料的時候是非常蒼白的,這裡Hbase提供了高階的查詢方法:Filter。Filter可以根據簇、列、版本等更多的條件來對資料進行過濾,基於Hbase本身提供的三維有序(主鍵有序、列有序、版本有序),這些Filter可以高效的完成查詢過濾的任務。帶有Filter條件的RPC查詢請求會把Filter分發到各個RegionServer,是一個伺服器端(Server-side)的過濾器,這樣也可以降低網路傳輸的壓力。
要完成一個過濾的操作,至少需要兩個引數。一個是抽象的操作符,Hbase提供了列舉型別的變數來表示這些抽象的操作符:LESS/LESS_OR_EQUAL/EQUAL/NOT_EUQAL等;另外一個就是具體的比較器(Comparator),代表具體的比較邏輯,如果可以提高位元組級的比較、字串級的比較等。有了這兩個引數,我們就可以清晰的定義篩選的條件,過濾資料。
- CompareFilter(CompareOp compareOp, WritableByteArrayComparable valueComparator)
CompareFilter是高層的抽象類,下面我們將看到它的實現類和實現類代表的各種過濾條件。這裡實現類實際上代表的是引數中的過濾器過濾的內容,可以使主鍵、簇名、列值等,這就是由CompareFilter決定了。
行過濾器(RowFilter)
行過濾器的比較物件是行主鍵
Java程式碼- Scan scan = new Scan();
-
Filter filter1 = new
- scan.setFilter(filter1);
- scan.close();
例中的Filter會將所有的小於等於“Hello”的主鍵過濾出來。
簇過濾器(FamilyFilter)
簇過濾器過濾的是簇的名字。
列過濾器(QualifierFilter)
列過濾器過濾的是列的名字。
值過濾器(ValueFilter)
值過濾器過濾的是掃描物件的值。
單值過濾器(SingleColumnValueFilter)
單值過濾器是以特定列的值為過濾內容,與值過濾器不同的是,這裡是特定的列,而值過濾器比較的是行內的所有列。所有在使用單值過濾器的時候要指定比較的列的座標。
Java程式碼- SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareOp compareOp, WritableByteArrayComparable comparator)
對於找不到該列的行,可以有特殊的處理
Java程式碼- void setFilterIfMissing(boolean filterIfMissing)
預設預設行將被包含進過濾的結果集中。
字首過濾器(PrefixFilter)
字首過濾器將會過濾掉不匹配的記錄,過濾的物件是主鍵的值。
Java程式碼- PrefixFilter(byte[] prefix)
頁過濾器(PageFilter)
頁過濾器可以根據主鍵有序返回固定數量的記錄,這需要客戶端在遍歷的時候記住頁開始的地方,配合scan的startkey一起使用。
Java程式碼- PageFilter(int size)
鍵過濾器(KeyOnlyFilter)
鍵過濾器可以簡單的設定過濾的結果集中只包含鍵而忽略值,這裡有一個選項可以把結果集的值儲存為值的長度。
FirstKeyOnlyFilter
在鍵過濾器的基礎上,根據列有序,只包含第一個滿足的鍵。
ColumnPrefixFilter
這裡過濾的物件是列的值。
TimestampsFilter
Java程式碼- TimestampsFilter(List<Long> times)
這裡引數是一個集合,只有包含在集合中的版本才會包含在結果集中。
包裝類過濾器,此類過濾器要通過包裝其他的過濾器才有意義,是其他過濾器的一種加強。
SkipFilter
Java程式碼- SkipFilter(Filter filter)
過濾器集合(FilterList)
Hbase的過濾器設計遵照於設計模式中的組合模式,以上的所有過濾器都可以疊加起來共同作用於一次查詢。
二、計數器(Counter)
Hbase提供一個計數器工具可以方便快速的進行計數的操作,而免去了加鎖等保證原子性的操作。但是實質上,計數器還是列,有自己的簇和列名。值得注意的是,維護計數器的值最好是用Hbase提供的API,直接操作更新很容易引起資料的混亂。
計數器的增量可以是正數負數,正數代表加,負數代表減。
Java程式碼- long icrementColumnValue(byte[] row, byte[] famuly, byte[] qualifier, long amount)
- Result increment(Increment increment)
三、協處理器(Coprocessor)
協處理器的思想是把處理的複雜程式碼分發到各個RegionServer,使大部分的計算可以在伺服器端,或者掃描的時候完成,提高處理的效率。形式上比較類似RDBMS中的儲存過程,不同的是,儲存過程的原理是在伺服器端進行預處理等優化,而協處理器僅僅只是伺服器處理,這裡又有點類似於Map-Reduce中的Map階段。
協處理器(Coprocesssor)有兩種,一種是觀察者(Obsever)另外一種是Endpoint(LZ跪了,實在不知道翻譯成啥)。
每個協處理器都有一個優先順序,優先順序分為USER/SYSTEM,優先順序決定處理器的執行順序,SYSTEM級別的處理器永遠先於USER。
每個處理器都有自己的執行環境(CoprocessorEnvironment),這個環境包含當前叢集和請求的狀態等資訊,是處理中重要的一部分,以建構函式引數的形式被傳入到處理器。
另外就是CoprocessorHost,這是Hbase管理協處理器的類,用來維護所有的處理器和其環境。
抽象如圖:
協處理器的載入有兩種方式,一種是通過配置檔案,在配置檔案中指定載入路徑、類名等,通過這種方式載入的處理器都是SYSTEM級別的,會作用於所有的請求,所有的表;另一種方式是通過在建立表的時候在表中指定,這種方式既可以建立全域性的SYSTEM級別的處理器,也可以建立USER級別的處理器,USER級別的處理器是針對表的。
- Path path = new Paht("test.jar");
- HTableDescriptor htd = new HTableDescriptor("test");
- htd.addFamily(new HColumnDescriptor("family1"));
- htd.setValue("Coprocessor$1", path.toString + "|" + className + "|" + Coprocessor.Priority.USER);
- HBaseAdmin admin = new HBaseAdmin(conf);
- admin.createTable(htd);
這裡setValue方法有兩個引數,第一個引數是協處理器的名字,$後面跟的是影響執行順序的序號;第二個引數是<path>|<classname>|<priority>。
Observer
這是第一種處理器,觀察者,觀察者有三種,分別用來監聽RegionServerObserver、MasterServerObserver、WALObserver。
RegionServer監聽的是Region Server上的操作,如在Region Server上的Get、Put等。操作被賦予生命週期:Pending open--open--Pending close
監聽器是可以監聽生命週期中的各個階段,並對其做出處理。
每一個監聽的方法都有一個上下文引數(Context),通過Context引數可以直接的操作請求的宣告週期。
Java程式碼- void bypass();
- void complete();
MasterObserver監聽的是Master Server上的操作,有點類似RDBMS中的DDL的操作如表操作、列操作等。
具體的操作和RegionServer比較類似。
Endpoint
這是第二種處理器,Endpoint相當於被分發到各個RegionServer上的儲存過程,可以在客戶端遠端呼叫的方法。Endpoint的存在使我們可以進行一些伺服器端的計算,如伺服器聚集、求和等運算,彌補了查詢API的不足。伺服器端計算的優勢是顯而易見的,它可以降低網路傳輸的資料量,合理利用伺服器資源。
從功能上可以看出Endpoint是一個基於RPC呼叫的模組,所以在實現自己的Endpoint時候需要定義我們自己的通訊協議。在Hbase中,通訊協議被抽象為CoprocessorProtocol介面,要實現我們的協議,我們要建立協議介面繼承自CoprocessorProtocol介面,然後再實現我們的協議類。
Java程式碼- public interface MyProtocol extends CoprocessorProtocol {
- public int work();
- }
協議類本身也是處理器,所以還要繼承BaseEndpointCoprocessor類。
Java程式碼- public class MyEndpoint extends BaseEndpointCoprocessor implements MyProtocol {
- public int work() {
- Sytem.out.println("hello");
- }
- }
在抽象的父類BaseEndpointCoprocessor中還提供了一些有用的方法,如我們可以拿到對應的環境類。
Java程式碼- RegionCoprocessorEnvironment getEnvironment()
配置好Endpoint重啟叢集環境以後,我們的實現類會被分發到各個RegionServer,通過HTable例項的方法我們可以呼叫到Endpoint。
Java程式碼- <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable);
startKey和endKey用於確定哪些RegionServer將執行Endpoint, Batch中的內部類將決定協議中方法的呼叫。
四、 HTablePool 連線池
在Hbase中,建立一個代表表的HTable例項是一個耗時且很佔資源的操作,類似操作資料庫,我們也需要建立我們自己的連線池,於是有了代表連線池的抽象類:HTable。
Java程式碼- HTablePool(Configuaration conf, int maxSize)
- HTablePool(Configuaration conf, int maxSize, HTableInterfaceFactory factory)
建立HTable需要配置檔案的例項,連線池的最大連線數也在構造方法中設定。另外,如果想要自己控制HTable被建立的過程,則需要實現自己的工廠方法。在連線池中,最大連線數(maxSize)的含義是,連線池管理的最大的連線數,當所需要的連線數超過最大值時,會臨時的建立連線來滿足需求,但是這些連線在使用完畢之後會被直接釋放且丟棄而不會進入連線池被管理,所以最大連線數代表的是連線池中最大被管理的連線數,而不是使用連線池最大可使用的連線數。
Java程式碼- HTableInterface getTable(String tableName)
- HTableInterface getTable(byte[] tableName)
- void putTable(HTableInterface table)
需要注意的是,使用完連線以後需要手動的呼叫putTable方法將連線放回池中。
附加上述過濾器的一些測試程式碼示例:
package com.test.junit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
public class JunitHbaseTest {
private static org.apache.hadoop.conf.Configuration conf = null;
private static HConnection connection = null;
private static HBaseAdmin admin = null;
private static final int MAX_TABLE_COUNT = 10;
static {
Configuration HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.master", "192.168.174.129:60000");
HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.174.129,192.168.174.130,192.168.174.131");
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
conf = HBaseConfiguration.create(HBASE_CONFIG);
try {
connection = HConnectionManager.createConnection(HBASE_CONFIG);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
admin = new HBaseAdmin(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void putTest() throws IOException, InterruptedException{
HTableInterface table = connection.getTable("myTest");
table.setAutoFlush(false);//關閉自動提交
List<Put> puts = new ArrayList<Put>(10);
for (int i = 40,len=50; i < len; i++) {
Put put = new Put(Bytes.toBytes("row-"+i),new Date().getTime());
put.add(Bytes.toBytes("student"), Bytes.toBytes("name1"), Bytes.toBytes("value"+i));
put.add(Bytes.toBytes("student"), Bytes.toBytes("age"), Bytes.toBytes(i+""));
put.add(Bytes.toBytes("student"), Bytes.toBytes("grade"), Bytes.toBytes("grade"+i));
puts.add(put);
put.heapSize();
put.size();
//下面的方法 從字面上基本上就可以知道
put.isEmpty();
put.getRow();
put.numFamilies();
}
table.put(puts);
table.flushCommits();//手動提交,只有當put總大小超過writeBufferSize 才提交 或者手工,table.flushCommits()
//table.setWriteBufferSize(1024*1024*5);//如果不設定 table.flushCommits(),只有達到達到這個大小的時候才能自動提交
admin.flush("myTest");
//批量操作方法一, 使用batch,可以混合各種操作 ( Put Delete Get 都是介面Row的實現)
//主要 這個如果處理Put操作 是不會使用客戶端快取的 會直接非同步的傳送到伺服器端
/*List<Row> rows = new ArrayList<Row>(10);
for (int i = 20,len=30; i < len; i++) {
Put put = new Put(Bytes.toBytes(("row-"+i)));
put.add(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes(("value"+i)));
put.add(Bytes.toBytes("data"), Bytes.toBytes("email"), Bytes.toBytes(("value"+i+"@sina.com")));
rows.add(put);
}
rows.add(new Delete(Bytes.toBytes("row-9")));
table.batch(rows); */
table.close();
}
@Test
public void getAllRecords(){
String tableName = "myTest";
try{
HTable table = new HTable(conf, tableName);
Scan s = new Scan();
ResultScanner ss = table.getScanner(s);
for(Result r:ss){
for(KeyValue kv : r.raw()){
//System.out.println(new String(kv.getRow()));//rowkey
//System.out.println(new String(kv.getFamily()));//family
System.out.println(new String(kv.getQualifier()));//qualifier
/*if((new String(kv.getQualifier())).equals("teacher")){
System.out.println(new String(kv.getValue()));
}*/
}
}
} catch (IOException e){
e.printStackTrace();
}
}
/**
* Delete與Put一致 把全部的Put改成Delete table.put -->table.delete 就可以了,
* 不過有些需要注意, 看下面
* @throws IOException
*/
@Test public void deleteTest() throws IOException{
HTableInterface table = connection.getTable("myTable");
try {
//如果上面介紹的KeyValue 有點印象, 通過delete提供的建構函式可以知道
//不指定會刪除所有的版本
Delete delete = new Delete(Bytes.toBytes("row-1"));
table.delete(delete);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 一些原子性操作 對於java併發工具包有所瞭解的 應該會知道 輕量級鎖的核心就是CAS機制(Compare and swap),
* 這裡在概念上有些類似, 也可以類似於 SQL中 select 出來然後 insert or update的 操作 Hbase這裡可以保證他們在一個原子操作
* 這個在高併發 場景下 更新值 是個好的選擇
* table.checkAndPut(row, family, qualifier, value, put)
* table.checkAndDelete(row, family, qualifier, value, delete)
* @throws IOException
*/
@Test public void atomicOP() throws IOException{
byte[] row = Bytes.toBytes("row-12");
byte[] family = Bytes.toBytes("data");
HTableInterface table = connection.getTable("myTable");
//操作成功會返回 true,否則false; 如果是個不存在的qualifier, 把value置為null check是會成功的
Put put = new Put(row);
put.add(family, Bytes.toBytes("namex"), Bytes.toBytes("value12"));
//check 和put是同一個row
boolean result1 = table.checkAndPut(row, family, Bytes.toBytes("namex"), null, put); //true
boolean result2 = table.checkAndPut(row, family, Bytes.toBytes("namex"), null, put); //false
Put put2 = new Put(row);
put2.add(family, Bytes.toBytes("namex"), Bytes.toBytes("value12"));
boolean result3 = table.checkAndPut(row, family, Bytes.toBytes("namex"),
Bytes.toBytes("value12"), put2); //true
Put put3 = new Put(Bytes.toBytes("row-13"));
put3.add(family, Bytes.toBytes("namex"), Bytes.toBytes("value13"));
boolean result4 = table.checkAndPut(row, family, Bytes.toBytes("namex2"),
Bytes.toBytes("value12"), put3); //org.apache.hadoop.hbase.DoNotRetryIOException
//注意:check 和put的一定要是同一行 否則會報錯
// table.checkAndDelete類似
}
@Test
public void testAddTable() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
String tableName = "myTest";
String[] familys = {"grade", "course"};
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists("myTest")) {
System.out.println("table already exists!");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for(int i=0; i<familys.length; i++){
tableDesc.addFamily(new HColumnDescriptor(familys[i]));
}
admin.createTable(tableDesc);
System.out.println("create table " + tableName + " ok.");
}
}
@Test
public void testAddRecord() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
String rowKey = "20160201113312";
String tableName = "myTest";
String gradeFamily = "grade";
String qualifier = "middle";
String value = "2";
try {
HTable table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(rowKey));//將字串轉化成位元組陣列
put.add(Bytes.toBytes(gradeFamily),Bytes.toBytes(qualifier),Bytes.toBytes(value));
table.put(put);
System.out.println("insert recored " + rowKey + " to table " + tableName +" ok.");
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testGetOneRecord() throws IOException{
String rowKey = "20160201113312";
String tableName = "myTest";
String gradeFamily = "grade";
String qualifier = "middle";
String value = "2";
HTable table = new HTable(conf, tableName);
Get get = new Get(rowKey.getBytes());//Get一般是獲取一條記錄
Result rs = table.get(get);
for(KeyValue kv : rs.raw()){
System.out.print(new String(kv.getRow()) + " " );
System.out.print(new String(kv.getFamily()) + ":" );
System.out.print(new String(kv.getQualifier()) + " " );
System.out.print(kv.getTimestamp() + " " );
System.out.println(new String(kv.getValue()));
}
}
@Test
public void deleteOneRecord() throws IOException{
String tableName = "myTest";
String rowKey = "20160201113312";
HTable table = new HTable(conf, tableName);
List list = new ArrayList();
Delete del = new Delete(rowKey.getBytes());
list.add(del);
table.delete(list);
System.out.println("del recored " + rowKey + " ok.");
}
@Test
public void deleteSomeRecord() throws IOException{
String tableName = "myTest";
String rowKey = "20160201113312";
HTable table = new HTable(conf, tableName);
List list = new ArrayList();
Delete del = new Delete(rowKey.getBytes());
list.add(del);
table.delete(list);
System.out.println("del recored " + rowKey + " ok.");
}
@Test
public void scanAdvance() throws IOException{
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
List<Filter> rootList = new ArrayList<Filter>();
List<Filter> selectList = new ArrayList<Filter>();
List<Filter> select_1 = new ArrayList<Filter>();
select_1.add(new FamilyFilter(CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes("teacher"))));
select_1.add(new QualifierFilter(CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes("age"))));
//List<Filter> select_2 = new ArrayList<Filter>();
//select_2.add(new FamilyFilter(CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes("cf2"))));
//select_2.add(new QualifierFilter(CompareOp.EQUAL,new BinaryPrefixComparator(Bytes.toBytes("column"))));
selectList.add(new FilterList(Operator.MUST_PASS_ALL, select_1));
//selectList.add(new FilterList(Operator.MUST_PASS_ALL, select_2));
rootList.add(new FilterList(Operator.MUST_PASS_ONE,selectList));
List<Filter> whereList = new ArrayList<Filter>();
//whereList.add(new RowFilter(CompareOp.GREATER,new BinaryComparator(Bytes.toBytes("7"))));
whereList.add(new ColumnRangeFilter(Bytes.toBytes("12"), true, Bytes.toBytes("13"), true));
//whereList.add(new RowFilter(CompareOp.EQUAL,new BinaryPrefixComparator(Bytes.toBytes("xxx"))));
rootList.add(new FilterList(Operator.MUST_PASS_ALL,whereList));
scan.setFilter(new FilterList(Operator.MUST_PASS_ALL, rootList));
ResultScanner ss = table.getScanner(scan);
for(Result r:ss){
for(KeyValue kv : r.raw()){
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
//System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));
}
}
}
@Test
public void ColumnCountGetFilter() throws IOException{
HTableInterface table = connection.getTable("myTest");
Get get=new Get(Bytes.toBytes("row-11"));
ColumnCountGetFilter filter=new ColumnCountGetFilter(2);
get.setFilter(filter);
Result result=table.get(get);
System.out.println(result.size());
}
@Test
public void ColumnPaginationGetFilter() throws IOException{
HTableInterface table = connection.getTable("myTest");
Scan scan=new Scan();
ColumnPaginationFilter filter=new ColumnPaginationFilter(1, 2);
scan.setFilter(filter);
ResultScanner resultScanner=table.getScanner(scan);
for(Result result:resultScanner){
for(KeyValue kv:result.raw()){
System.out.println(kv+"-----"+Bytes.toString(kv.getQualifier()));
}
}
resultScanner.close();
}
@Test
public void RowFilter() throws IOException{//行過濾器過濾的是主鍵
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-23")));
scan.setFilter(filter1);
ResultScanner resultScanner=table.getScanner(scan);
for(Result result:resultScanner){
for(KeyValue kv:result.raw()){
System.out.println(kv+"-----"+Bytes.toString(kv.getValue()));
}
}
resultScanner.close();
}
@Test
public void FamilyFilter() throws IOException{//行過濾器過濾的是主鍵
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
Filter filter1 = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("student")));
scan.setFilter(filter1);
ResultScanner resultScanner=table.getScanner(scan);
for(Result result:resultScanner){
for(KeyValue kv:result.raw()){
System.out.println(kv+"-----"+Bytes.toString(kv.getValue()));
}
}
resultScanner.close();
}
@Test
public void RowFamilyFilter() throws IOException{//行過濾器過濾的是主鍵
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
Filter filter1 = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("student")));
Filter filter2 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-11")));
filterList.add(filter1);
filterList.add(filter2);
scan.setFilter(new FilterList(Operator.MUST_PASS_ALL, filterList));
ResultScanner resultScanner=table.getScanner(scan);
for(Result result:resultScanner){
for(KeyValue kv:result.raw()){
System.out.println(kv+"-----"+Bytes.toString(kv.getValue()));
}
}
resultScanner.close();
}
@Test
public void QualifierFilter() throws IOException{//行過濾器過濾的是主鍵
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
Filter filter1 = new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("grade")));
filterList.add(filter1);
scan.setFilter(new FilterList(Operator.MUST_PASS_ALL, filterList));
ResultScanner resultScanner=table.getScanner(scan);
for(Result result:resultScanner){
for(KeyValue kv:result.raw()){
System.out.println(kv+"-----"+Bytes.toString(kv.getValue()));
}
}
resultScanner.close();
}
@Test
public void ValueFilter() throws IOException{//行過濾器過濾的是主鍵
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
Filter filter1 = new ValueFilter(CompareOp.LESS, new BinaryComparator(Bytes.toBytes("professor34")));
filterList.add(filter1);
scan.setFilter(new FilterList(Operator.MUST_PASS_ALL, filterList));
ResultScanner resultScanner=table.getScanner(scan);
for(Result result:resultScanner){
for(KeyValue kv:result.raw()){
System.out.println(kv+"-----"+Bytes.toString(kv.getValue()));
}
}
resultScanner.close();
}
@Test
public void PrefixFilter() throws IOException{//主鍵的前序匹配
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
PrefixFilter filter1 = new PrefixFilter(Bytes.toBytes("row-2"));
filterList.add(filter1);
scan.setFilter(new FilterList(Operator.MUST_PASS_ALL,filterList));
ResultScanner resultScanner=table.getScanner(scan);
for(Result result:resultScanner){
for(KeyValue kv:result.raw()){
System.out.println(kv+"-----"+Bytes.toString(kv.getValue()));
}
}
resultScanner.close();
}
@Test
public void ColumnPrefixFilter() throws IOException{//強過濾條件定位到指定的列
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
ColumnPrefixFilter filter1 = new ColumnPrefixFilter(Bytes.toBytes("age"));
filterList.add(filter1);
scan.setFilter(new FilterList(Operator.MUST_PASS_ALL,filterList));
ResultScanner resultScanner=table.getScanner(scan);
for(Result result:resultScanner){
for(KeyValue kv:result.raw()){
System.out.println(kv+"-----"+Bytes.toString(kv.getValue()));
}
}
resultScanner.close();
}
@Test
public void ColumnPrefixRangeFilter() throws IOException{//強過濾條件定位到指定的列
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
FamilyFilter filter1 = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("student")));
ColumnRangeFilter filter2 = new ColumnRangeFilter(Bytes.toBytes("name"),true,Bytes.toBytes("name1"),true);
filterList.add(filter1);
filterList.add(filter2);
scan.setFilter(new FilterList(Operator.MUST_PASS_ALL,filterList));
ResultScanner resultScanner=table.getScanner(scan);
for(Result result:resultScanner){
for(KeyValue kv:result.raw()){
System.out.println(kv+"-----"+Bytes.toString(kv.getValue()));
}
}
resultScanner.close();
}
@Test
public void PageFilter() throws IOException{//強過濾條件定位到指定的列
HTableInterface table = connection.getTable("myTest");
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
FamilyFilter filter1 = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("student")));
PageFilter filter2 = new PageFilter(10);
filterList.add(filter1);
filterList.add(filter