1. 程式人生 > 其它 >hbase協處理器

hbase協處理器

技術標籤:hbase

寫在前邊:為表新增協處理器,失敗後 建議刪除表然後新建表 然後再次新增協處理器,我的就是一直在讀快取的協處理器,也可能是我環境的問題。

package com.ws.coprocessor;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;

public class myCoprocess implements RegionCoprocessor, RegionObserver {

    @Override
    public Optional<RegionObserver> getRegionObserver() {
        return Optional.of(this);
    }
    // alter 'star', METHOD => 'table_att', 'Coprocessor'=>'hdfs://dream1:9000/starcop/starcop.jar| com.ws.coprocessor.myCoprocess|100|'
    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException {
        Log log = LogFactory.getLog(myCoprocess.class);
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","dream1:2181,dream2:2181,dream3:2181");
        Connection conn = ConnectionFactory.createConnection(conf);
        CellScanner cellScanner = put.cellScanner();
        if (cellScanner.advance()){
            Cell current = cellScanner.current();
            byte[] star = CellUtil.cloneRow(current);
            log.error("傳入行鍵:"+Bytes.toString(star));
            byte[] mine = CellUtil.cloneValue(current);
            log.error("傳入值:"+Bytes.toString(mine));

            Table fans = conn.getTable(TableName.valueOf("fans"));
            Put fs = new Put(mine);
            fs.addColumn("f".getBytes(),star,star);
            fans.put(fs);
            conn.close();
        }else{
            Table fans = conn.getTable(TableName.valueOf("fans"));
            Put fs = new Put("erro".getBytes());
            fs.addColumn("f".getBytes(),"erro".getBytes(),"erro".getBytes());
            fans.put(fs);
        }
    }
}

新增協處理器

alter 'star', METHOD => 'table_att', 'Coprocessor'=>'hdfs://dream1:9000/starcop/starcop.jar| 
hbase alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|
arg1=1,arg2=2'

'users' 攔截的目標表

METHOD => 'table_att' 這個表示alter命令要對users表做屬性修改操作

hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar 協處理器類所在的jar

org.myname.hbase.Coprocessor.RegionObserverExample 自定義的協處理器實現類

1073741823 協處理器的執行順序號,可以為任意數值,小的會優先執行

arg1=1,arg2=2 自定義協處理器所需要的引數

移出協處理器

alter 'students',METHOD =>'table_att_unset',NAME => 'coprocessor$1'