1. 程式人生 > >原始碼解讀--(2)hbase-examples BufferedMutator Example

原始碼解讀--(2)hbase-examples BufferedMutator Example

1.摒棄HTable,直接建立HTable裡的BufferedMutator物件操作hbase客戶端完全可行

    在前面的hbase客戶端原始碼分析中,我們客戶端的建立方式如下:

Java程式碼  收藏程式碼
  1. //預設connection實現是org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation    
  2. Connection connection = ConnectionFactory.createConnection(configuration);        
  3. //預設table實現是org.apache.hadoop.hbase.client.HTable  
      
  4. Table table = connection.getTable(TableName.valueOf("tableName"));   
  1. 預設我們拿到了connection的實現org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation,裡面我們需要注意的是通過setupRegistry()類設定了與zookeeper互動的重要類org.apache.hadoop.hbase.client.ZookeeperRegistry類,後續與zookeeper互動都由此類完成
  2. 然後通過connection拿到了table的實現org.apache.hadoop.hbase.client.HTable
  3. 最後發現org.apache.hadoop.hbase.client.HTable歸根結底持有的就是BufferedMutatorImpl型別的屬性mutator,所有後續的操作都是基於mutator操作

    那麼其實我們操作hbase客戶端,完全可以摒棄HTable物件,直接構建BufferedMutator,然後操作hbase,正如所料在hbase的原始碼模組hbase-examples裡也正好提到了這種使用方法,使用的關鍵程式碼如下:

Java程式碼  收藏程式碼
  1. Configuration configuration = HBaseConfiguration.create();        
  2. configuration.set("hbase.zookeeper.property.clientPort"
    "2181");        
  3. configuration.set("hbase.client.write.buffer""2097152");        
  4. configuration.set("hbase.zookeeper.quorum","192.168.199.31,192.168.199.32,192.168.199.33,192.168.199.34,192.168.199.35");  
  5. BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("tableName"));  
  6. //3177不是我杜撰的,是2*hbase.client.write.buffer/put.heapSize()計算出來的     
  7. int bestBathPutSize = 3177;     
  8. //這裡利用jdk1.7裡的新特性try(必須實現java.io.Closeable的物件){}catch (Exception e) {}  
  9. //相當於呼叫了finally功能,呼叫(必須實現java.io.Closeable的物件)的close()方法,也即會呼叫conn.close(),mutator.close()  
  10. try(  
  11.   //預設connection實現是org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation   
  12.   Connection conn = ConnectionFactory.createConnection(configuration);  
  13.   //預設mutator實現是org.apache.hadoop.hbase.client.BufferedMutatorImpl  
  14.   BufferedMutator mutator = conn.getBufferedMutator(params);  
  15. ){           
  16.   List<Put> putLists = new ArrayList<Put>();      
  17.   for(int count=0;count<100000;count++){      
  18.     Put put = new Put(rowkey.getBytes());      
  19.     put.addImmutable("columnFamily1".getBytes(), "columnName1".getBytes(), "columnValue1".getBytes());      
  20.     put.addImmutable("columnFamily1".getBytes(), "columnName2".getBytes(), "columnValue2".getBytes());      
  21.     put.addImmutable("columnFamily1".getBytes(), "columnName3".getBytes(), "columnValue3".getBytes());      
  22.     put.setDurability(Durability.SKIP_WAL);    
  23.     putLists.add(put);      
  24.     if(putLists.size()==bestBathPutSize){      
  25.       //達到最佳大小值了,馬上提交一把      
  26.         mutator.mutate(putLists);     
  27.         mutator.flush();  
  28.         putLists.clear();  
  29.     }      
  30.   }      
  31.   //剩下的未提交資料,最後做一次提交         
  32.   mutator.mutate(putLists);     
  33.   mutator.flush();  
  34. }catch(IOException e) {  
  35.   LOG.info("exception while creating/destroying Connection or BufferedMutator", e);  
  36. }   

2.BufferedMutatorParams

    BufferedMutatorParams主要是收集構造BufferedMutator物件的引數資訊,這些引數包括hbase資料表名、hbase客戶端緩衝區、hbase rowkey最大所佔空間、執行緒池和監聽hbase操作的回撥監聽器(比如監聽hbase寫入失敗)

Java程式碼  收藏程式碼
  1. package org.apache.hadoop.hbase.client;  
  2. import java.util.concurrent.ExecutorService;  
  3. import org.apache.hadoop.hbase.TableName;  
  4. import org.apache.hadoop.hbase.classification.InterfaceAudience;  
  5. import org.apache.hadoop.hbase.classification.InterfaceStability;  
  6. /** 
  7.  * 構造BufferedMutator物件的類BufferedMutatorParams 
  8.  */  
  9. @InterfaceAudience.Public  
  10. @InterfaceStability.Evolving  
  11. public class BufferedMutatorParams {  
  12.   static final int UNSET = -1;  
  13.   private final TableName tableName;//hbase資料表  
  14.   private long writeBufferSize = UNSET;//hbase客戶端緩衝區  
  15.   private int maxKeyValueSize = UNSET;//hbase rowkey最大所佔空間  
  16.   private ExecutorService pool = null;//執行緒池  
  17.   private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {//監聽hbase操作的回撥監聽器,比如監聽hbase寫入失敗  
  18.     @Override  
  19.     public void onException(RetriesExhaustedWithDetailsException exception,  
  20.         BufferedMutator bufferedMutator)  
  21.         throws RetriesExhaustedWithDetailsException {  
  22.       throw exception;  
  23.     }  
  24.   };  
  25.   public BufferedMutatorParams(TableName tableName) {//構造方法  
  26.     this.tableName = tableName;  
  27.   }  
  28.   public TableName getTableName() {//獲取表名  
  29.     return tableName;  
  30.   }  
  31.   public long getWriteBufferSize() {//獲取寫緩衝區大小  
  32.     return writeBufferSize;  
  33.   }  
  34.   /** 
  35.    * 重寫緩衝區設定函式 
  36.    */  
  37.   public BufferedMutatorParams writeBufferSize(long writeBufferSize) {  
  38.     this.writeBufferSize = writeBufferSize;  
  39.     return this;  
  40.   }  
  41.   public int getMaxKeyValueSize() {//獲取rowkey所佔空間  
  42.     return maxKeyValueSize;  
  43.   }  
  44.   /** 
  45.    * 重寫設定rowkey所佔空間的函式 
  46.    */  
  47.   public BufferedMutatorParams maxKeyValueSize(int maxKeyValueSize) {  
  48.     this.maxKeyValueSize = maxKeyValueSize;  
  49.     return this;  
  50.   }  
  51.   public ExecutorService getPool() {//獲取執行緒池  
  52.     return pool;  
  53.   }  
  54.   public BufferedMutatorParams pool(ExecutorService pool) {//建構函式  
  55.     this.pool = pool;  
  56.     return this;  
  57.   }  
  58.   public BufferedMutator.ExceptionListener getListener() {//獲取監聽器  
  59.     return listener;  
  60.   }  
  61.   public BufferedMutatorParams listener(BufferedMutator.ExceptionListener listener) {//建構函式  
  62.     this.listener = listener;  
  63.     return this;  
  64.   }  
  65. }  

3.BufferedMutator

    BufferedMutator是一個介面,主要定義了一些抽象方法:

Java程式碼  收藏程式碼
  1. public interface BufferedMutator extends Closeable {  
  2.   TableName getName();//獲取表名  
  3.   Configuration getConfiguration();//獲取hadoop配置物件Configuration  
  4.   void mutate(Mutation mutation) throws IOException;//操作緩衝區  
  5.   void mutate(List<? extends Mutation> mutations) throws IOException;//批量操作緩衝區  
  6.   @Override  
  7.   void close() throws IOException;//實現Closeable介面,這樣可以利用JDK1.7新特性不寫finally就可以關閉物件  
  8.   void flush() throws IOException;//想hbase服務端提交資料請求  
  9.   long getWriteBufferSize();//獲取寫緩衝區大小  
  10.   @InterfaceAudience.Public  
  11.   @InterfaceStability.Evolving  
  12.   interface ExceptionListener {//監聽器  
  13.     public void onException(RetriesExhaustedWithDetailsException exception,  
  14.         BufferedMutator mutator) throws RetriesExhaustedWithDetailsException;  
  15.   }  
  16. }  

4.BufferedMutatorImpl

Java程式碼  收藏程式碼
  1. package org.apache.hadoop.hbase.client;  
  2. import com.google.common.annotations.VisibleForTesting;  
  3. import org.apache.commons.logging.Log;  
  4. import org.apache.commons.logging.LogFactory;  
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.hbase.TableName;  
  7. import org.apache.hadoop.hbase.classification.InterfaceAudience;  
  8. import org.apache.hadoop.hbase.classification.InterfaceStability;  
  9. import org.apache.hadoop.hbase.ipc.RpcControllerFactory;  
  10. import java.io.IOException;  
  11. import java.io.InterruptedIOException;  
  12. import java.util.Arrays;  
  13. import java.util.LinkedList;  
  14. import java.util.List;  
  15. import java.util.concurrent.ConcurrentLinkedQueue;  
  16. import java.util.concurrent.ExecutorService;  
  17. import java.util.concurrent.TimeUnit;  
  18. import java.util.concurrent.atomic.AtomicLong;  
  19. /** 
  20.  * hbase1.0.0才開始使用BufferedMutatorImpl 
  21.  * 主要用於在多執行緒中操作同一個資料表 
  22.  * 需要注意的是多執行緒中共享一個BufferedMutator物件,如果某個執行緒中出錯,其他執行緒也會出錯 
  23.  */  
  24. @InterfaceAudience.Private  
  25. @InterfaceStability.Evolving  
  26. public class BufferedMutatorImpl implements BufferedMutator {  
  27.   private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);  
  28.   private final ExceptionListener listener;//hbase客戶端每次操作的監聽回撥物件  
  29.   protected ClusterConnection connection; //持有的連結  
  30.   private final TableName tableName;//hbase資料表  
  31.   private volatile Configuration conf;//hadoop配置類Configuration  
  32.   @VisibleForTesting  
  33.   final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();//hbase緩衝區佇列  
  34.   @VisibleForTesting  
  35.   AtomicLong currentWriteBufferSize = new AtomicLong(0);//執行緒安全的長整型值,主要累計當前在緩衝區中資料所佔空間大小  
  36.   private long writeBufferSize;//hbase客戶端緩衝區大小  
  37.   private final <