Hbase 2.0 RegionObserver使用
阿新 • • 發佈:2018-11-21
Hbase2.0 不支援 1.x版本的RegionObserver ,檢視hbase官網更新說明,自己做了測試並通過
- Hbase RegionObserver
import java.io.IOException; import java.util.List; import java.util.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; import com.izhonghong.util.Md5Utils; //注:2.0版本之前使用extends BaseRegionObserver 實現, public class ObserverTest implements RegionObserver,RegionCoprocessor { private static final Log LOG = LogFactory.getLog(ObserverTest.class); static Connection connection = null; static Table table = null; static{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "test:2181"); try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf("tableName")); } catch (Exception e) { e.printStackTrace(); } } private RegionCoprocessorEnvironment env = null; private static final String FAMAILLY_NAME = "fn"; private static final String UID = "uid"; private static final String BIZ = "biz"; //2.0加入該方法,否則無法生效 @Override public Optional<RegionObserver> getRegionObserver() { // Extremely important to be sure that the coprocessor is invoked as a RegionObserver return Optional.of(this); } @Override public void start(CoprocessorEnvironment e) throws IOException { env = (RegionCoprocessorEnvironment) e; } @Override public void stop(CoprocessorEnvironment e) throws IOException { // nothing to do here } @Override public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, final WALEdit edit, final Durability durability) throws IOException { try { List<Cell> list = put.get(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(UID)); if (list == null || list.size() == 0) { return; } Cell cell2 = list.get(0); String uid = Bytes.toString(CellUtil.cloneValue(cell2)); Put put2 = new Put(Bytes.toBytes(Md5Utils.getMd5ByStr(uid))); put2.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(BIZ), Bytes.toBytes(uid)); put2.setTTL(1000l * 60 * 60 *24 *30); table.put(put2); table.close(); } catch (Exception e1) { LOG.error("異常------->>>>>> "+e1.getMessage()); return ; } } }
- 協處理器安裝-表級別安裝
disable 'tableName'
alter 'tableName' , METHOD =>'table_att','coprocessor'=>'/hbase/coprocessor/hbase-coprocessor-1.0.0.jar|com.xxx.hbase.coprocessor.xxx|1001'
enable 'tableName'
- 協處理器解除安裝
disable 'tableName' alter 'tableName', METHOD => 'table_att_unset', NAME => 'coprocessor$1' enable 'tableName'