1. 程式人生 > >Hbase基本操作類

Hbase基本操作類

package com.test.hbase;


import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fangdd.cdw2.data.dao.HBasePageModel;

public class HBaseOperator {
	
	private static Logger LOG = LoggerFactory.getLogger(HBaseOperator.class);
	
	private Configuration conf;
	
	private Connection connection;
	
	/** 載入hbase配置檔案 */
	public HBaseOperator(String zkConfs, String zkNode) {
        this.conf = HBaseConfiguration.create();
        this.conf.set("hbase.zookeeper.quorum", zkConfs);    
        this.conf.set("zookeeper.znode.parent", zkNode);
    }
	
    private synchronized Connection getHConnection() {
        try {  
        	if (this.connection == null || this.connection.isClosed()) {
        		this.connection = ConnectionFactory.createConnection(conf);
        		LOG.info("hbase connection init..................");
        	}
        	return connection;
        } catch (IOException e) {
            e.printStackTrace();  
        }
        return null;
    }
    
    /** 
     * 描述:獲取所有表 
     * @return TableName陣列
     * @throws Exception 
     */  
    public TableName[] listTable() throws Exception {
    	Admin admin = this.getHConnection().getAdmin();
        TableName[] tableNames = admin.listTableNames(); 
        admin.close();
        return tableNames;  
    }
    
    /**
     * 描述:建立表
     * @param tableName  表名稱
     * @param cfNames    column family名稱陣列
     * @param version   支援版本
     * @throws IOException
     */
    public void createTable(String tableName, String[] cfNames, int version) throws IOException {
    	Admin admin = this.getHConnection().getAdmin();
    	TableName table = TableName.valueOf(tableName);
        if (admin.tableExists(table)) {
            admin.disableTable(table);  
            admin.deleteTable(table);  
        }
        HTableDescriptor tableDescriptor = new HTableDescriptor(table);  
        HColumnDescriptor hd = null;  
        for (int i = 0; i < cfNames.length; i++) {
            hd = new HColumnDescriptor(cfNames[i]);  
            hd.setMaxVersions(version);  
            tableDescriptor.addFamily(hd);
        }
        admin.createTable(tableDescriptor);
        admin.close();
    }
    
    /**
     * 描述:向表中rowKey對應的記錄新增column->value
     * @param tableName    表名
     * @param rowKey       rowkey
     * @param columnFamily 列族名
     * @param qualifier    列識別符號
     * @param value        列名對應的值
     * @throws IOException
     */
	public void put(String tableName, String rowKey, String columnFamily, String qualifier, 
			String value) throws IOException {
		Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
		Put put = new Put(Bytes.toBytes(rowKey));
		put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(value));
		htable.put(put);
		htable.close();
	}
	
	/**
	 * 描述:獲取單條記錄中的columnFamily:qualifier最新的記錄
	 * @param tableName     表名
	 * @param rowKey        
	 * @param columnFamily  列族名
	 * @param qualifier     列識別符號
	 * @return 列識別符號最新版本值
	 * @throws IOException 
	 */
	public String getQualifierValue(String tableName, String rowKey,  String columnFamily,
			String qualifier) throws IOException {
		String lastestValue = null; 
		Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
		Get get = new Get(Bytes.toBytes(rowKey));
		get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
		Result result = htable.get(get);

		byte[] value =  result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
		if (value != null && value.length > 0) {
			lastestValue = Bytes.toString(value);
		}
		 
		htable.close();
		return lastestValue;
	}
	
	
	/**
	 * 描述:獲rowkey資料組中所有rowkey對應columnFamily:qualifier的值
	 * @param tableName     表名
	 * @param rowKeys       rowkey陣列
	 * @param columnFamily  列族名
	 * @param qualifier     列識別符號
	 * @return  所有rowkey對應的列值陣列
	 * @throws IOException 
	 */
	public List<String> getQualifierValue(String tableName, String[] rowKeys,  String columnFamily,
			String qualifier) throws IOException {
		List<String> values = null;
		Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
		List<Get> getList = new ArrayList<Get>();
		for (String rowKey : rowKeys) {
			Get get = new Get(Bytes.toBytes(rowKey));
			get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
			getList.add(get);
		}
		Result[] results = htable.get(getList);
		if (results != null && results.length > 0) {
			for (Result result : results) {
				byte[] value =  result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
				if (value != null) {
					if (values == null) {
						values = new ArrayList<String>();
					}
					String qualifierValue = Bytes.toString(value);
					values.add(qualifierValue);
				}
			}
		}
		htable.close();
		return values;
	}
	
    /**
     * 描述:獲取單條記錄中的columnFamily:qualifier所有版本記錄值
     * @param tableName     表名
     * @param rowKey
     * @param columnFamily  列族名
     * @param qualifier     列識別符號
     * @return 列識別符號最新版本值
     * @throws IOException
     */
	public List<String> getAllVersionValues(String tableName, String rowKey,  String columnFamily,
			String qualifier) throws IOException {
		List<String> values = null;
		Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
		Get get = new Get(Bytes.toBytes(rowKey));
		get.setMaxVersions(); // 設定最大版本號
		get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
		Result result = htable.get(get);
		List<Cell> cells = result.listCells();
		if (cells != null && cells.size() > 0) {
			values = new ArrayList<String>();
			for (Cell cell : cells) {
				String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
				values.add(value);
			}
		}
		htable.close();
		return values;
	}
	
	/**
	 * 描述:獲取某個rowkey下指定的column family的所有qualifier值
	 * @param tableName     表名
	 * @param rowKey
	 * @param columnFamily  列族名
	 * @return
	 * @throws IOException
	 */
	public Map<String, String> getQualifierMapValue(String tableName, String rowKey, String columnFamily) 
			throws IOException {
		Map<String, String> valuesMap = null;
		Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
		Get get = new Get(Bytes.toBytes(rowKey));
		get.addFamily(Bytes.toBytes(columnFamily));
		Result result = htable.get(get);	
		
		NavigableMap<byte[],byte[]> qualifierValues = result.getFamilyMap(Bytes.toBytes(columnFamily));
		if (qualifierValues != null && qualifierValues.size() > 0) {
			valuesMap = new HashMap<String, String>();
			Iterator<byte[]> it = qualifierValues.keySet().iterator();
			while (it.hasNext()) {
				byte[] qualifier = it.next();
				byte[] value = qualifierValues.get(qualifier);
				valuesMap.put(Bytes.toString(qualifier), Bytes.toString(value));
			}
		}
		
		htable.close();
		return valuesMap;
	}
	
	
	/**
	 * 描述:獲取一條記錄下column family的所有qualifier值
	 * @param tableName
	 * @param rowKey
	 * @param columnFamily
	 * @return
	 * @throws IOException
	 */
	public Map<String, Map<String, String>> getQualifierMapValue(String tableName, String rowKey) 
			throws IOException {
		Map<String, Map<String, String>> valuesMap = null;
		Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
		Get get = new Get(Bytes.toBytes(rowKey));
		Result result = htable.get(get);
		
		NavigableMap<byte[], NavigableMap<byte[],byte[]>> familyMap = result.getNoVersionMap();
		if (familyMap != null && familyMap.size() > 0) {
			valuesMap = new HashMap<String, Map<String, String>>();
			Iterator<byte[]> it = familyMap.keySet().iterator();
			while (it.hasNext()) {
				byte[] familyBytes = it.next();
				String familyName= Bytes.toString(familyBytes);
				NavigableMap<byte[], byte[]> qualifierMap = familyMap.get(familyBytes);
				if (qualifierMap != null && qualifierMap.size() > 0) {
					Map<String, String> qualifierValueMap = new HashMap<String, String>();
					for (byte[] qualifierBytes: qualifierMap.keySet()) {
						String qualifierName = Bytes.toString(qualifierBytes);
						String qualifierValue = Bytes.toString(qualifierMap.get(qualifierBytes));
						if (qualifierValue != null) {
							qualifierValueMap.put(qualifierName, qualifierValue);
						}
					}
					if (qualifierValueMap.size() > 0) {
						valuesMap.put(familyName, qualifierValueMap);
					}
				}
				
			}
		}
		
		htable.close();
		return valuesMap;
	}
	
	/**
	 * 描述 :獲取指定的qualifier中從startTimestamp->endTimestamp(不包含endTimestamp)的所有版本值
	 * @param tableName        表名
	 * @param rowKey           
	 * @param columnFamily     列族名稱
	 * @param qualifier        列名
	 * @param startTimestamp   起始版本號
	 * @param endTimestamp     結束版本號
	 * @return
	 * @throws IOException
	 */
	public List<String> getQualifierValuesByRange(String tableName, String rowKey, String columnFamily,
			String qualifier, long startTimestamp, long endTimestamp) throws IOException {
		List<String> values = null;
		Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
		Get get = new Get(Bytes.toBytes(rowKey));
		get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
		get.setMaxVersions();
		get.setTimeRange(startTimestamp, endTimestamp);
		Result result = htable.get(get);
		
		List<Cell> cells = result.listCells();
		if (cells != null && cells.size() > 0) {
			values = new ArrayList<String>();
			for (Cell cell : cells) {
				String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
				values.add(value);
			}
		}
		htable.close();
		return values;
	}
	
	/**
	 * 描述:獲取起始startRowKey,endRowKey之間並且滿足條件的rowkeys
	 * @param tableName     表名
	 * @param startRowKey   起始rowKey
	 * @param endRowKey     結束rowKey
	 * @param filterList    過濾器
	 * @return  rowkey列表
	 * @throws IOException
	 */
	public List<String> scanRowkeys(String tableName, String startRowKey, String endRowKey, 
			FilterList filterList) throws IOException {
        if(startRowKey == null || startRowKey.equals("") || 
        		endRowKey == null || endRowKey.equals("")) {  
            return null; 
        }
        List<String> list = null;
        final byte[] POSTFIX = new byte[] { 0x00 };
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes(startRowKey));  
        scan.setStopRow(Bytes.add(Bytes.toBytes(endRowKey), POSTFIX));  // 包含endRowkey
        if (filterList != null) {
        	scan.setFilter(filterList);
        }
        Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
        ResultScanner resultScanner = htable.getScanner(scan);
        for (Result result : resultScanner) {
        	byte[] rowBytes = result.getRow();
        	if (rowBytes != null) {
        		String rowkey = Bytes.toString(rowBytes);
        		if (list == null) {
        			list = new ArrayList<String>();
        		}
        		list.add(rowkey);
        	}
        }
        resultScanner.close();
        htable.close();
        return list;
	}
	
	/**
	 * 描述:獲取起始startRowKey,endRowKey之間並且滿足條件的記錄
	 * @param tableName     表名
	 * @param startRowKey   起始rowKey
	 * @param endRowKey     結束rowKey
	 * @param filterList    過濾器
	 * @return
	 * @throws IOException
	 */
	public List<Map<String, Map<String, String>>> scanRangeRows(String tableName, String startRowKey, 
			String endRowKey, FilterList filterList) throws IOException {
        if(startRowKey == null || startRowKey.equals("") || 
        		endRowKey == null || endRowKey.equals("")) {  
            return null; 
        }
        List<Map<String, Map<String, String>>> list = null;
        final byte[] POSTFIX = new byte[] { 0x00 };
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes(startRowKey));  
        scan.setStopRow(Bytes.add(Bytes.toBytes(endRowKey), POSTFIX));  // 包含endRowkey
        if (filterList != null) {
        	scan.setFilter(filterList);
        }
        Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
        ResultScanner resultScanner = htable.getScanner(scan);
        for (Result result : resultScanner) {
        	Map<String, Map<String, String>> familyMap = getResultRow(result);
        	if (familyMap != null && familyMap.size() > 0) {
        		if (list == null) {
        			list = new ArrayList<Map<String, Map<String, String>>>();
        		}
        		list.add(familyMap);
        	}
        }
        resultScanner.close();
        htable.close();
        return list;
	}
	
    /**
     * 描述:根據過濾條件範圍掃描並進行資料分頁操作
     * @param tableName     表名
     * @param startRowKey   開始rowkey
     * @param endRowKey     結束rowkey
     * @param pageIndex     第幾頁
     * @param pageSize      分頁每頁記錄數
     * @param filterList    過濾器
     * @return 分頁模型資料物件
     * @throws IOException
     */
	public HBasePageModel scanPageRows(String tableName, String startRowKey, 
			String endRowKey, Integer pageIndex, Integer pageSize, FilterList filterList) throws IOException {
        if(startRowKey == null || startRowKey.equals("") || 
        		endRowKey == null || endRowKey.equals("")) {  
            return null; 
        }
        List<Map<String, Map<String, String>>> resultList = null;
        if (pageIndex == null || pageIndex <= 0) {
        	pageIndex = 1;       // 預設第一頁
        }
        if (pageSize == null || pageSize <= 0) {
        	pageSize = 50;       // 預設每頁50條
        }
        int pageStartIndex = (pageIndex - 1) * pageSize;
        int pageEndIndex = pageStartIndex + pageSize;
        
        final byte[] POSTFIX = new byte[] { 0x00 };
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes(startRowKey));  
        scan.setStopRow(Bytes.add(Bytes.toBytes(endRowKey), POSTFIX));  // 包含endRowkey
        if (filterList != null) {
        	scan.setFilter(filterList);
        }
        Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
        ResultScanner resultScanner = htable.getScanner(scan);
        int i = 0;
        for (Result result : resultScanner) {
        	Map<String, Map<String, String>> familyMap = getResultRow(result);
        	if (familyMap != null && familyMap.size() > 0) {
        		if (i >= pageStartIndex && i < pageEndIndex) {  // 在分頁index範圍內
            		if (resultList == null) {
            			resultList = new ArrayList<Map<String, Map<String, String>>>();
            		}
            		resultList.add(familyMap);
        		}
        		
        	}
        	i++;
        }
        HBasePageModel pageModel = new HBasePageModel(pageIndex, pageSize, i);
        pageModel.setResultList(resultList);
        resultScanner.close();
        htable.close();
        return pageModel;
	}
	
	private List<Map<String, Map<String, String>>> scanPageRows(String tableName,
			 String lastRowKey, int pageSize) throws IOException {
        if(lastRowKey == null || lastRowKey.equals("")) {  
            return null; 
        }
        final byte[] POSTFIX = new byte[] { 0x00 };
        List<Map<String, Map<String, String>>> list = null;
        Scan scan = new Scan();
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        Filter pageFilter = new PageFilter(pageSize);
        filterList.addFilter(pageFilter);
        scan.setFilter(filterList);
        scan.setStartRow(Bytes.add(Bytes.toBytes(lastRowKey), POSTFIX));
        
        Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
        ResultScanner resultScanner = htable.getScanner(scan);
        for (Result result : resultScanner) {
        	Map<String, Map<String, String>> familyMap = getResultRow(result);
        	if (familyMap != null && familyMap.size() > 0) {
        		if (list == null) {
        			list = new ArrayList<Map<String, Map<String, String>>>();
        		}
        		list.add(familyMap);
        	}
        }
        resultScanner.close();
        htable.close();
		return list;
	}
	
	/**
	 * 描述:獲取一行記錄的所有列族中列的值
	 * @param result  rowkey對應的Result物件
	 * @return 列族中列的值map
	 */
	private Map<String, Map<String, String>> getResultRow(Result result) {
		if (result == null) {
			return null;
		}
		Map<String, Map<String, String>> familyMap = new HashMap<String, Map<String, String>>();
		String rowkey = Bytes.toString(result.getRow());
    	for (Cell cell : result.rawCells()) {
    		String columnFamilyName = Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
    		String qualifierName = Bytes.toStringBinary(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
    		String qualifierValue = Bytes.toStringBinary(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
    		if (!familyMap.containsKey(columnFamilyName)) {
    			Map<String, String> qualifierMap = new HashMap<String, String>();
    			qualifierMap.put(qualifierName, qualifierValue);
    			familyMap.put(columnFamilyName, qualifierMap);
    		} else {
    			familyMap.get(columnFamilyName).put(qualifierName, qualifierValue);
    		}
    	}
    	return familyMap;
	}
	
	/**
	 * 描述:刪除rowkeys記錄
	 * @param tableName  表名
	 * @param rowKeys    
	 * @throws IOException
	 */
    public void deleteRow(String tableName, String[] rowKeys) throws IOException {
    	Table htable = this.getHConnection().getTable(TableName.valueOf(tableName));
        List<Delete> list = new ArrayList<Delete>();  
        for (int i = 0; i < rowKeys.length; i++) {  
            Delete delete = new Delete(Bytes.toBytes(Long.valueOf(rowKeys[i])));  
            list.add(delete);  
        }  
        htable.delete(list);  
        htable.close();  
    }
    
    /**
     * 描述:判斷表是否存在
     * @param tableName 表名
     * @return boolean
     * @throws IOException
     */
    public boolean isExist(String tableName) throws IOException {
    	Admin admin = this.getHConnection().getAdmin();
    	TableName table = TableName.valueOf(tableName);
        boolean exist = admin.tableExists(table);
        admin.close();
        return exist;
    }
    
    /**
     * 描述:刪除表 
     * @param tableName 表名稱
     * @throws IOException
     */
    public void deleteTable(String tableName) throws IOException {
    	Admin admin = this.getHConnection().getAdmin();
    	TableName table = TableName.valueOf(tableName);
        if (admin.tableExists(table)) {  
            admin.disableTable(table);  
            admin.deleteTable(table);  
        }
        admin.close();
    }
    
    private static void dumpCell(Cell cell) {
    	System.out.println(
    	    Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "_" +
            Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "_" +  
            Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()) + "_" + 
            cell.getTimestamp()
        );
    }

}