hbase api常用方法使用及預分割槽解決熱點問題
阿新 • • 發佈:2019-02-16
API 操作:
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.kktest.hbase.HashChoreWoker; import com.kktest.hbase.HashRowKeyGenerator; import com.kktest.hbase.RowKeyGenerator; import com.kktest.hbase.BitUtils; /** * hbase 客戶端 * * @author kuang hj * */ @SuppressWarnings("all") public class HBaseClient { private static Logger logger = LoggerFactory.getLogger(HBaseClient.class); private static Configuration config; static { config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", "192.168.1.100:2181,192.168.1.101:2181,192.168.1.103:2181"); } /** * 根據隨機雜湊(hash)建立分割槽表 * * @throws Exception * hash_split_table */ public static void testHashAndCreateTable(String tableNameTmp, String columnFamily) throws Exception {<p> // 取隨機雜湊 10 代表 10個分割槽 HashChoreWoker worker = new HashChoreWoker(1000000, 10); byte[][] splitKeys = worker.calcSplitKeys(); HBaseAdmin admin = new HBaseAdmin(config); TableName tableName = TableName.valueOf(tableNameTmp); if (admin.tableExists(tableName)) { try { admin.disableTable(tableName); } catch (Exception e) { } admin.deleteTable(tableName); } HTableDescriptor tableDesc = new HTableDescriptor(tableName); HColumnDescriptor columnDesc = new HColumnDescriptor( Bytes.toBytes(columnFamily)); columnDesc.setMaxVersions(1); tableDesc.addFamily(columnDesc); admin.createTable(tableDesc, splitKeys); admin.close(); } /** * @Title: queryData * @Description: 從HBase查詢出資料 * @author kuang hj * @param tableName * 表名 * @param rowkey * rowkey * @return 返回使用者資訊的list * @throws Exception */ @SuppressWarnings("all") public static ArrayList<String> queryData(String tableName, String rowkey) throws Exception { ArrayList<String> list = new ArrayList<String>(); logger.info("開始時間"); HTable table = new HTable(config, tableName); Get get = new Get(rowkey.getBytes()); // 根據主鍵查詢 Result r = table.get(get); logger.info("結束時間"); KeyValue[] kv = r.raw(); for (int i = 0; i < kv.length; i++) { // 迴圈每一列 String key = kv[i].getKeyString(); String value = kv[i].getValueArray().toString(); // 將查詢到的結果寫入List中 list.add(key + ":"+ value); }// end of 遍歷每一列 return list; } /** * 增加表資料 * * @param tableName * @param rowkey */ public static void insertData(String tableName, String rowkey) { HTable table = null; try { table = new HTable(config, tableName); // 一個PUT代表一行資料,再NEW一個PUT表示第二行資料,每行一個唯一的ROWKEY,此處rowkey為put構造方法中傳入的值 for (int i = 1; i < 100; i++) { byte[] result = getNumRowkey(rowkey,i); Put put = new Put(result); // 本行資料的第一列 put.add(rowkey.getBytes(), "name".getBytes(), ("aaa" + i).getBytes()); // 本行資料的第三列 put.add(rowkey.getBytes(), "age".getBytes(), ("bbb" + i).getBytes()); // 本行資料的第三列 put.add(rowkey.getBytes(), "address".getBytes(), ("ccc" + i).getBytes()); table.put(put); } } catch (Exception e1) { e1.printStackTrace(); } } private static byte[] getNewRowkey(String rowkey) { byte[] result = null; RowKeyGenerator rkGen = new HashRowKeyGenerator(); byte[] splitKeys = rkGen.nextId(); byte[] rowkeytmp = rowkey.getBytes(); result = new byte[splitKeys.length + rowkeytmp.length]; System.arraycopy(splitKeys, 0, result, 0, splitKeys.length); System.arraycopy(rowkeytmp, 0, result, splitKeys.length, rowkeytmp.length); return result; } public static void main(String[] args) { RowKeyGenerator rkGen = new HashRowKeyGenerator(); byte[] splitKeys = rkGen.nextId(); System.out.println(splitKeys); } private static byte[] getNumRowkey(String rowkey, int i) { byte[] result = null; RowKeyGenerator rkGen = new HashRowKeyGenerator(); byte[] splitKeys = rkGen.nextId(); byte[] rowkeytmp = rowkey.getBytes(); byte[] intVal = BitUtils.getByteByInt(i); result = new byte[splitKeys.length + rowkeytmp.length + intVal.length]; System.arraycopy(splitKeys, 0, result, 0, splitKeys.length); System.arraycopy(rowkeytmp, 0, result, splitKeys.length, rowkeytmp.length); System.arraycopy(intVal, 0, result, splitKeys.length+rowkeytmp.length, intVal.length); return result; } /** * 刪除表 * * @param tableName */ public static void dropTable(String tableName) { try { HBaseAdmin admin = new HBaseAdmin(config); admin.disableTable(tableName); admin.deleteTable(tableName); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 查詢所有 * * @param tableName */ public static void QueryAll(String tableName) { HTable table = null; try { table = new HTable(config, tableName); ResultScanner rs = table.getScanner(new Scan()); for (Result r : rs) { System.out.println("獲得到rowkey:" + new String(r.getRow())); for (KeyValue keyValue : r.raw()) { System.out.println("列:" + new String(keyValue.getFamily()) + "====值:" + new String(keyValue.getValue())); } } } catch (IOException e) { e.printStackTrace(); } } /** * 查詢所有 * * @param tableName */ public static void QueryByCondition1(String tableName) { HTable table = null; try { table = new HTable(config, tableName); Get scan = new Get("abcdef".getBytes());// 根據rowkey查詢 Result r = table.get(scan); System.out.println("獲得到rowkey:" + new String(r.getRow())); for (KeyValue keyValue : r.raw()) { System.out.println("列:" + new String(keyValue.getFamily()) + "====值:" + new String(keyValue.getValue())); } } catch (IOException e) { e.printStackTrace(); } } /** * 根據rowkwy前墜查詢 * @param tableName * @param rowkey */ public static void queryByRowKey(String tableName,String rowkey) { try { HTable table = new HTable(config, tableName); Scan scan = new Scan(); scan.setFilter(new PrefixFilter(rowkey.getBytes())); ResultScanner rs = table.getScanner(scan); KeyValue[] kvs = null; for (Result tmp : rs) { kvs = tmp.raw(); for (KeyValue kv : kvs) { System.out.print(kv.getRow()+" "); System.out.print(kv.getFamily()+" :"); System.out.print(kv.getQualifier()+" "); System.out.print(kv.getTimestamp()+" "); System.out.println(kv.getValue()); } } } catch (IOException e) { e.printStackTrace(); } } /** * 查詢所有 * * @param tableName */ public static void QueryByCondition2(String tableName) { try { HTable table = new HTable(config, tableName); // 當列column1的值為aaa時進行查詢 Filter filter = new SingleColumnValueFilter( Bytes.toBytes("column1"), null, CompareOp.EQUAL, Bytes.toBytes("aaa")); Scan s = new Scan(); s.setFilter(filter); ResultScanner rs = table.getScanner(s); for (Result r : rs) { System.out.println("獲得到rowkey:" + new String(r.getRow())); for (KeyValue keyValue : r.raw()) { System.out.println("列:" + new String(keyValue.getFamily()) + "====值:" + new String(keyValue.getValue())); } } } catch (Exception e) { e.printStackTrace(); } } /** * 查詢所有 * * @param tableName */ public static void QueryByCondition3(String tableName) { try { HTable table = new HTable(config, tableName); List<Filter> filters = new ArrayList<Filter>(); Filter filter1 = new SingleColumnValueFilter( Bytes.toBytes("column1"), null, CompareOp.EQUAL, Bytes.toBytes("aaa")); filters.add(filter1); Filter filter2 = new SingleColumnValueFilter( Bytes.toBytes("column2"), null, CompareOp.EQUAL, Bytes.toBytes("bbb")); filters.add(filter2); Filter filter3 = new SingleColumnValueFilter( Bytes.toBytes("column3"), null, CompareOp.EQUAL, Bytes.toBytes("ccc")); filters.add(filter3); FilterList filterList1 = new FilterList(filters); Scan scan = new Scan(); scan.setFilter(filterList1); ResultScanner rs = table.getScanner(scan); for (Result r : rs) { System.out.println("獲得到rowkey:" + new String(r.getRow())); for (KeyValue keyValue : r.raw()) { System.out.println("列:" + new String(keyValue.getFamily()) + "====值:" + new String(keyValue.getValue())); } } rs.close(); } catch (Exception e) { e.printStackTrace(); } } }</p>
HashChoreWoker: import java.util.Iterator; import java.util.TreeSet; import org.apache.hadoop.hbase.util.Bytes; /** * * @author kuang hj * */ public class HashChoreWoker{ // 隨機取機數目 private int baseRecord; // rowkey生成器 private RowKeyGenerator rkGen; // 取樣時,由取樣數目及region數相除所得的數量. private int splitKeysBase; // splitkeys個數 private int splitKeysNumber; // 由抽樣計算出來的splitkeys結果 private byte[][] splitKeys; public HashChoreWoker(int baseRecord, int prepareRegions) { this.baseRecord = baseRecord; // 例項化rowkey生成器 rkGen = new HashRowKeyGenerator(); splitKeysNumber = prepareRegions - 1; splitKeysBase = baseRecord / prepareRegions; } public byte[][] calcSplitKeys() { splitKeys = new byte[splitKeysNumber][]; // 使用treeset儲存抽樣資料,已排序過 TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); for (int i = 0; i < baseRecord; i++) { rows.add(rkGen.nextId()); } int pointer = 0; Iterator<byte[]> rowKeyIter = rows.iterator(); int index = 0; while (rowKeyIter.hasNext()) { byte[] tempRow = rowKeyIter.next(); rowKeyIter.remove(); if ((pointer != 0) && (pointer % splitKeysBase == 0)) { if (index < splitKeysNumber) { splitKeys[index] = tempRow; index++; } } pointer++; } rows.clear(); rows = null; return splitKeys; } }
HashRowKeyGenerator: import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MD5Hash; import com.kktest.hbase.BitUtils; /** * * **/ public class HashRowKeyGenerator implements RowKeyGenerator { private static long currentId = 1; private static long currentTime = System.currentTimeMillis(); //private static Random random = new Random(); public byte[] nextId() { try { currentTime = getRowKeyResult(Long.MAX_VALUE - currentTime); byte[] lowT = Bytes.copy(Bytes.toBytes(currentTime), 4, 4); byte[] lowU = Bytes.copy(Bytes.toBytes(currentId), 4, 4); byte[] result = Bytes.add(MD5Hash.getMD5AsHex(Bytes.add(lowT, lowU)) .substring(0, 8).getBytes(), Bytes.toBytes(currentId)); return result; } finally { currentId++; } } /** * getRowKeyResult * @param tmpData * @return */ public static long getRowKeyResult(long tmpData) { String str = String.valueOf(tmpData); StringBuffer sb = new StringBuffer(); char[] charStr = str.toCharArray(); for (int i = charStr.length -1 ; i > 0; i--) { sb.append(charStr[i]); } return Long.parseLong(sb.toString()); } }
</pre><pre name="code" class="java">