1. 程式人生 > >Hbase過濾器(二):專用過濾器API

Hbase過濾器(二):專用過濾器API

目錄

一:單列值過濾器(SingleColumnValueFilter)

方法類:

singleColumnValueFilter(byte[] family,byte[] qualifier,compareOp compareOp,btye[] value)

singleColumnValueFilter(byte[] family,byte[] qualifier,compareOp compareOp,WritableByteArrayComparable comparator)

boolean getFilterIfMissing()

void setFilterIfMissing(boolean filterIfMissing)----當前參考列不存在時,如果結果中包含該列設定true過濾掉

boolean getLatestVersionOnly()

void setLatestVersionOnly(boolean latestVersionOnly)----設定為true時,只檢查參考列最近版本

Java API:

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class singleColumnValueFilter {
    private Configuration configuration = null;
    private Connection connection = null;

    @Test
    public void singleColumnValueFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        /*設定單列值過濾器
         * cf1  ----列族
         * name ----列名
         */
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("cf1"),Bytes.toBytes("name"),
                CompareFilter.CompareOp.NOT_EQUAL,new SubstringComparator("csn"));
        //設定當前參考列過濾掉不顯示
        singleColumnValueFilter.setFilterIfMissing(true);
        Scan scan = new Scan();
        scan.setFilter(singleColumnValueFilter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
//            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

二:單列排除過濾器(SingleColumnValueExcludeFilter)

單列排除過濾器繼承自單列值過濾器(singleColumnValueFilter),唯一不同於單列值過濾器就是Result例項中永遠不會獲得作為檢查目標的參考列。

Java API:

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class singleColumnValueExcludeFilter {

    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void singleColumnValueExcludeFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        /*設定單列值過濾器
         * cf1  ----列族
         * name ----列名
         */
        SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(Bytes.toBytes("cf1"),Bytes.toBytes("name"),
                CompareFilter.CompareOp.NOT_EQUAL,new SubstringComparator("csn"));
        Scan scan = new Scan();
        scan.setFilter(singleColumnValueExcludeFilter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            Integer age = Bytes.toInt(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age")));
            System.out.println(age);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

三:字首過濾器(PrefixFilter)

字首過濾器是通過給定的rowkey字首匹配出固定的行----字首匹配只用於scan中,get作用不大

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class prefixFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void prefixFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        //row3是匹配rowkey的淺醉
        Filter filter = new PrefixFilter(Bytes.toBytes("row3"));
        Scan scan = new Scan();
        scan.setFilter(filter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

四:分頁過濾器(PageFilter)

分頁過濾器:對habse中的資料,按照所設定的一頁的行數,進行分頁。

注:如果先指定了PageFilter..後指定SingleColumnValueFilter的話..條件正確但是獲取不到資料..

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class pageFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void pageFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        Filter filter = new PageFilter(2);
        int totalRows =0;
        //設定最後一行行鍵
        byte[] lastRow = null;
        //長度為零的位元組陣列
        byte[] POSTFIX = new byte[] { 0x00 };
        while(true){
            Scan scan = new Scan();
            scan.setFilter(filter);
            //當最後一行行鍵不為空時,將最後一行行鍵加0,作為下一頁起始行行鍵
            if(lastRow!=null){
                byte[] startRow = Bytes.add(lastRow,POSTFIX);
                System.out.println(startRow + Bytes.toStringBinary(startRow));
                scan.setStartRow(startRow);
            }
            ResultScanner resultScanner = table.getScanner(scan);
            int localRows = 0;
            Result result;
            while((result = resultScanner.next())!=null){
                System.out.println(localRows++ +":"+result);
                totalRows++;
                lastRow = result.getRow();
            }
            resultScanner.close();
            if(localRows==0)
                break;
        }
        System.out.println("total rows:" + totalRows);
    }
}

五:行鍵過濾器(KeyOnlyFilter)

在一些應用中只需要將結果中keyValue例項的鍵返回,而不需要返回實際的資料時使用

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class keyOnlyFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void prefixFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        //設定為true時,返回值,設定為false只返回鍵
        KeyOnlyFilter filter = new KeyOnlyFilter(false);
        Scan scan = new Scan();
        scan.setFilter(filter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

六:首次行鍵過濾器(FirstKeyOnlyFilter)

如果使用者需要訪問一行中的第一列(HBase隱式排序),則這種過濾器可以滿足需求,這種過濾器通常在行計數統計(row counter)的應用場景中使用,這種場景只需要檢查這一行是否存在。

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class firstKeyOnlyFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void firstKeyOnlyFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        FirstKeyOnlyFilter filter = new FirstKeyOnlyFilter();
        Scan scan = new Scan();
        scan.setFilter(filter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

七:包含結束的過濾器(InclusiveStopFilter)

掃描操作中的開始行被包含到結果中,但終止行被排除在外,使用這個過濾器時,使用者也可以將結束行包括到結果中。

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class inclusiveStopFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void firstKeyOnlyFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        Filter filter = new InclusiveStopFilter(Bytes.toBytes("row3"));
        Scan scan = new Scan();
        scan.setFilter(filter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

八:時間戳過濾器(TimestampsFilter)

使用者需要在掃描結果中對版本進行細粒度的控制時,這個過濾器可以滿足需求----TimestampsFilter(List<Long> timestamps)

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.TimestampsFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class timestampsFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void timestampsFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));

        List<Long> ts = new ArrayList<Long>();
        //根據特定時間過濾時間戳
//        ts.add(new Long(1522469029503l));
        TimestampsFilter filter = new TimestampsFilter(ts);
        Scan scan = new Scan();
        scan.setFilter(filter);
        //根據時間範圍過濾時間戳
        scan.setTimeRange(1522469029503l,1522479029503l);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

九:列計數過濾器(ColumnCountGetFilter)

使用者可以使用這個過濾器來限制每行最多取回多少列----ColumnCountGetFilter(int n)

當一行的列數達到設定的最大值時,這個過濾器會停止整個掃描操作----比較適合在get()方法中使用

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import java.io.IOException;

public class ColumnCountGetFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void ColumnCountGetFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        //2表示顯示的列數
        org.apache.hadoop.hbase.filter.ColumnCountGetFilter filter = new org.apache.hadoop.hbase.filter.ColumnCountGetFilter(2);
        Scan scan = new Scan();
        scan.setFilter(filter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

十:列分頁過濾器(ColumnPaginationFilter)

ColumnPaginationFilter(int limit,int offset)----因此建構函式中的兩個數字是用來規定返回的其實位置和返回資料數量的。

limit----每次返回limit個列

offset----從第offset個列開始返回,偏移量

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class columnPaginationFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void columnPaginationFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        ColumnPaginationFilter columnPaginationFilter = new ColumnPaginationFilter(3,2);
        Scan scan = new Scan();
        scan.setFilter(columnPaginationFilter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

十一:列字首過濾器(ColumnPrefixFilter)

類似於PrefixFilter,這個過濾器通過對列名稱進行字首匹配過濾

ColumnPrefixFilter(byte[] prefix)

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class columnPrefixFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void columnPrefixFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("na"));
        Scan scan = new Scan();
        scan.setFilter(columnPrefixFilter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}

十二:隨機行過濾器(RandomRowFilte)

RandomRowFilter(float chance)

chance:取值在0.0-1.0之間

1.當chance賦一個負值會導致所有結果都被過濾掉

2.當chance大於1.0則結果集中包含所有行

package specialFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;

public class randomRowFilter {
    private Configuration configuration = null;
    private Connection connection = null;
    @Test
    public void randomRowFilter() throws IOException {
        System.out.println("begin");
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ns1:t1"));
        RandomRowFilter randomRowFilter = new RandomRowFilter(new Float(0.4));
        Scan scan = new Scan();
        scan.setFilter(randomRowFilter);
        ResultScanner resultScanner = table.getScanner(scan);
        for(Result result:resultScanner){
            System.out.println(result);
            String name = Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")));
            System.out.println(name);
        }
        resultScanner.close();
        table.close();
        System.out.println("end");
    }
}