原始碼解讀--(2)hbase-examples BufferedMutator Example
阿新 • • 發佈:2019-02-11
1.摒棄HTable,直接建立HTable裡的BufferedMutator物件操作hbase客戶端完全可行
在前面的hbase客戶端原始碼分析中,我們客戶端的建立方式如下:
Java程式碼- //預設connection實現是org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation
- Connection connection = ConnectionFactory.createConnection(configuration);
- //預設table實現是org.apache.hadoop.hbase.client.HTable
- Table table = connection.getTable(TableName.valueOf("tableName"));
- 預設我們拿到了connection的實現org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation,裡面我們需要注意的是通過setupRegistry()類設定了與zookeeper互動的重要類org.apache.hadoop.hbase.client.ZookeeperRegistry類,後續與zookeeper互動都由此類完成
- 然後通過connection拿到了table的實現org.apache.hadoop.hbase.client.HTable
- 最後發現org.apache.hadoop.hbase.client.HTable歸根結底持有的就是BufferedMutatorImpl型別的屬性mutator,所有後續的操作都是基於mutator操作
那麼其實我們操作hbase客戶端,完全可以摒棄HTable物件,直接構建BufferedMutator,然後操作hbase,正如所料,在hbase的原始碼模組hbase-examples裡也正好提到了這種使用方法,使用的關鍵程式碼如下:
Java程式碼- Configuration configuration = HBaseConfiguration.create();
- configuration.set("hbase.zookeeper.property.clientPort"
- configuration.set("hbase.client.write.buffer", "2097152");
- configuration.set("hbase.zookeeper.quorum","192.168.199.31,192.168.199.32,192.168.199.33,192.168.199.34,192.168.199.35");
- BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("tableName"));
- //3177不是我杜撰的,是2*hbase.client.write.buffer/put.heapSize()計算出來的
- int bestBathPutSize = 3177;
- //這裡利用jdk1.7裡的新特性try(必須實現java.io.Closeable的物件){}catch (Exception e) {}
- //相當於呼叫了finally功能,呼叫(必須實現java.io.Closeable的物件)的close()方法,也即會呼叫conn.close(),mutator.close()
- try(
- //預設connection實現是org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation
- Connection conn = ConnectionFactory.createConnection(configuration);
- //預設mutator實現是org.apache.hadoop.hbase.client.BufferedMutatorImpl
- BufferedMutator mutator = conn.getBufferedMutator(params);
- ){
- List<Put> putLists = new ArrayList<Put>();
- for(int count=0;count<100000;count++){
- Put put = new Put(rowkey.getBytes());
- put.addImmutable("columnFamily1".getBytes(), "columnName1".getBytes(), "columnValue1".getBytes());
- put.addImmutable("columnFamily1".getBytes(), "columnName2".getBytes(), "columnValue2".getBytes());
- put.addImmutable("columnFamily1".getBytes(), "columnName3".getBytes(), "columnValue3".getBytes());
- put.setDurability(Durability.SKIP_WAL);
- putLists.add(put);
- if(putLists.size()==bestBathPutSize){
- //達到最佳大小值了,馬上提交一把
- mutator.mutate(putLists);
- mutator.flush();
- putLists.clear();
- }
- }
- //剩下的未提交資料,最後做一次提交
- mutator.mutate(putLists);
- mutator.flush();
- }catch(IOException e) {
- LOG.info("exception while creating/destroying Connection or BufferedMutator", e);
- }
2.BufferedMutatorParams
BufferedMutatorParams主要是收集構造BufferedMutator物件的引數資訊,這些引數包括hbase資料表名、hbase客戶端緩衝區、hbase rowkey最大所佔空間、執行緒池和監聽hbase操作的回撥監聽器(比如監聽hbase寫入失敗)
Java程式碼- package org.apache.hadoop.hbase.client;
- import java.util.concurrent.ExecutorService;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.classification.InterfaceAudience;
- import org.apache.hadoop.hbase.classification.InterfaceStability;
- /**
- * 構造BufferedMutator物件的類BufferedMutatorParams
- */
- @InterfaceAudience.Public
- @InterfaceStability.Evolving
- public class BufferedMutatorParams {
- static final int UNSET = -1;
- private final TableName tableName;//hbase資料表
- private long writeBufferSize = UNSET;//hbase客戶端緩衝區
- private int maxKeyValueSize = UNSET;//hbase rowkey最大所佔空間
- private ExecutorService pool = null;//執行緒池
- private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {//監聽hbase操作的回撥監聽器,比如監聽hbase寫入失敗
- @Override
- public void onException(RetriesExhaustedWithDetailsException exception,
- BufferedMutator bufferedMutator)
- throws RetriesExhaustedWithDetailsException {
- throw exception;
- }
- };
- public BufferedMutatorParams(TableName tableName) {//構造方法
- this.tableName = tableName;
- }
- public TableName getTableName() {//獲取表名
- return tableName;
- }
- public long getWriteBufferSize() {//獲取寫緩衝區大小
- return writeBufferSize;
- }
- /**
- * 重寫緩衝區設定函式
- */
- public BufferedMutatorParams writeBufferSize(long writeBufferSize) {
- this.writeBufferSize = writeBufferSize;
- return this;
- }
- public int getMaxKeyValueSize() {//獲取rowkey所佔空間
- return maxKeyValueSize;
- }
- /**
- * 重寫設定rowkey所佔空間的函式
- */
- public BufferedMutatorParams maxKeyValueSize(int maxKeyValueSize) {
- this.maxKeyValueSize = maxKeyValueSize;
- return this;
- }
- public ExecutorService getPool() {//獲取執行緒池
- return pool;
- }
- public BufferedMutatorParams pool(ExecutorService pool) {//建構函式
- this.pool = pool;
- return this;
- }
- public BufferedMutator.ExceptionListener getListener() {//獲取監聽器
- return listener;
- }
- public BufferedMutatorParams listener(BufferedMutator.ExceptionListener listener) {//建構函式
- this.listener = listener;
- return this;
- }
- }
3.BufferedMutator
BufferedMutator是一個介面,主要定義了一些抽象方法:
Java程式碼- public interface BufferedMutator extends Closeable {
- TableName getName();//獲取表名
- Configuration getConfiguration();//獲取hadoop配置物件Configuration
- void mutate(Mutation mutation) throws IOException;//操作緩衝區
- void mutate(List<? extends Mutation> mutations) throws IOException;//批量操作緩衝區
- @Override
- void close() throws IOException;//實現Closeable介面,這樣可以利用JDK1.7新特性不寫finally就可以關閉物件
- void flush() throws IOException;//想hbase服務端提交資料請求
- long getWriteBufferSize();//獲取寫緩衝區大小
- @InterfaceAudience.Public
- @InterfaceStability.Evolving
- interface ExceptionListener {//監聽器
- public void onException(RetriesExhaustedWithDetailsException exception,
- BufferedMutator mutator) throws RetriesExhaustedWithDetailsException;
- }
- }
4.BufferedMutatorImpl
Java程式碼- package org.apache.hadoop.hbase.client;
- import com.google.common.annotations.VisibleForTesting;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.classification.InterfaceAudience;
- import org.apache.hadoop.hbase.classification.InterfaceStability;
- import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
- import java.io.IOException;
- import java.io.InterruptedIOException;
- import java.util.Arrays;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.concurrent.ConcurrentLinkedQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicLong;
- /**
- * hbase1.0.0才開始使用BufferedMutatorImpl
- * 主要用於在多執行緒中操作同一個資料表
- * 需要注意的是多執行緒中共享一個BufferedMutator物件,如果某個執行緒中出錯,其他執行緒也會出錯
- */
- @InterfaceAudience.Private
- @InterfaceStability.Evolving
- public class BufferedMutatorImpl implements BufferedMutator {
- private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
- private final ExceptionListener listener;//hbase客戶端每次操作的監聽回撥物件
- protected ClusterConnection connection; //持有的連結
- private final TableName tableName;//hbase資料表
- private volatile Configuration conf;//hadoop配置類Configuration
- @VisibleForTesting
- final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();//hbase緩衝區佇列
- @VisibleForTesting
- AtomicLong currentWriteBufferSize = new AtomicLong(0);//執行緒安全的長整型值,主要累計當前在緩衝區中資料所佔空間大小
- private long writeBufferSize;//hbase客戶端緩衝區大小
- private final <