package cn.edu.shu.ces.chenjie.tianyi.hbase.utils; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import com.google.gson.Gson; import cn.edu.shu.ces.chenjie.tianyi.hive.model.ClientData; /*** * 使用Java訪問HBase * @author ChenJie * */ public class HbaseDataSourceUtil { private static Configuration conf = null; //配置檔案 private static Admin admin = null; //管理員 public static Connection conn = null; //連線 /** * 初始化連線 * @throws IOException */ public static void init() throws IOException { System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.5"); // 必備條件之一 //執行這個程式的計算機都要配置hadoop conf = HBaseConfiguration.create(); //建立一個配置 conf.set("hbase.zookeeper.quorum", "pc2,pc3,pc4"); //設定zookeeper的結點主機名 conf.set("hbase.zookeeper.property.clientPort", "2181"); //設定zookeepe埠號 conn = ConnectionFactory.createConnection(conf); //使用配置建立一個HBase連線 admin = conn.getAdmin(); //得到管理器 } /*** * 建表 * @param tableName 表名 * @param families 列簇 * @throws IOException */ public static void createTable(String tableName, String[] families) throws IOException { init(); //初始化 if (admin.tableExists(TableName.valueOf(tableName))) //如果表已經存在 { System.out.println(tableName + "已存在"); } else //如果表不存在 { HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); //使用表名建立一個表描述器HTableDescriptor物件 for (String family : families) { tableDesc.addFamily(new HColumnDescriptor(family)); //新增列族 } admin.createTable(tableDesc); //建立表 System.out.println("Table created"); } } /** * 新增列簇 * @param tableName 表名 * @param family 列族名 */ public static void addFamily(String tableName, String family) { try { init(); HColumnDescriptor columnDesc = new HColumnDescriptor(family); admin.addColumn(TableName.valueOf(tableName), columnDesc); } catch (IOException e) { e.printStackTrace(); } finally { destroy(); } } /** * 查詢表資訊 * @param conn * @param tableName */ public static void query(String tableName) { HTable hTable = null; ResultScanner scann = null; try { init(); hTable = (HTable) conn.getTable(TableName.valueOf(tableName)); scann = hTable.getScanner(new Scan()); for (Result rs : scann) { System.out.println("RowKey為:" + new String(rs.getRow())); // 按cell進行迴圈 for (Cell cell : rs.rawCells()) { System.out.println("列簇為:" + new String(CellUtil.cloneFamily(cell))); System.out.println("列修飾符為:" + new String(CellUtil.cloneQualifier(cell))); System.out.println("值為:" + new String(CellUtil.cloneValue(cell))); } System.out.println("============================================="); } } catch (IOException e) { e.printStackTrace(); } finally { if (scann != null) { scann.close(); } if (hTable != null) { try { hTable.close(); } catch (IOException e) { e.printStackTrace(); } } destroy(); } } public static ResultScanner query2(String tableName) { HTable hTable = null; ResultScanner scann = null; try { init(); hTable = (HTable) conn.getTable(TableName.valueOf(tableName)); scann = hTable.getScanner(new Scan()); for (Result rs : scann) { System.out.println("RowKey為:" + new String(rs.getRow())); // 按cell進行迴圈 for (Cell cell : rs.rawCells()) { System.out.println("列簇為:" + new String(CellUtil.cloneFamily(cell))); System.out.println("列修飾符為:" + new String(CellUtil.cloneQualifier(cell))); System.out.println("值為:" + new String(CellUtil.cloneValue(cell))); } System.out.println("============================================="); } return scann; } catch (IOException e) { e.printStackTrace(); } finally { if (scann != null) { scann.close(); } if (hTable != null) { try { hTable.close(); } catch (IOException e) { e.printStackTrace(); } } destroy(); } return null; } /** * 根據rowkey查詢單行 * * @param conn * @param key * @param tableName */ public static void queryByRowKey(String key, String tableName) { HTable hTable = null; try { init(); hTable = (HTable) conn.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(key)); get.setMaxVersions(3); // will return last 3 versions of row Result rs = hTable.get(get); System.out.println(tableName + "表RowKey為" + key + "的行資料如下:"); for (Cell cell : rs.rawCells()) { //System.out.println("key為:" + new String(CellUtil.cloneRow(cell))); System.out.println("\t列簇為:" + new String(CellUtil.cloneFamily(cell))); System.out.println("\t列識別符號為:" + new String(CellUtil.cloneQualifier(cell))); System.out.println("\t值為:" + new String(CellUtil.cloneValue(cell))); System.out.println("\t----------------------------------------------------------"); } } catch (IOException e) { e.printStackTrace(); } finally { if (hTable != null) { try { hTable.close(); } catch (IOException e) { e.printStackTrace(); } } destroy(); } } /** * 插入單行單列簇單列修飾符資料 * * @param conn * @param tableName * @param key * @param family * @param col * @param val */ public static void addOneRecord(String tableName, String key, String family, String col, String val) { HTable hTable = null; try { init(); hTable = (HTable) conn.getTable(TableName.valueOf(tableName)); Put p = new Put(Bytes.toBytes(key)); p.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), Bytes.toBytes(val)); if (p.isEmpty()) { System.out.println("資料插入異常,請確認資料完整性,稍候重試"); } else { hTable.put(p); } } catch (IOException e) { e.printStackTrace(); } finally { if (hTable != null) { try { hTable.close(); } catch (IOException e) { e.printStackTrace(); } } destroy(); } } /** * 插入單行單列簇多列修飾符資料 * * @param conn * @param tableName * @param key * @param family * @param cols * @param val */ public static void addMoreRecord(String tableName, String key, String family, Map<String, String> colVal) { HTable hTable = null; try { init(); hTable = (HTable) conn.getTable(TableName.valueOf(tableName)); Put p = new Put(Bytes.toBytes(key)); for (String col : colVal.keySet()) { String val = colVal.get(col); if (StringUtils.isNotBlank(val)) { p.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), Bytes.toBytes(val)); } else { System.out.println("列值為空,請確認資料完整性"); } } // 當put物件沒有成功插入資料時,此時呼叫hTable.put(p)方法會報錯:java.lang.IllegalArgumentException:No // columns to insert if (p.isEmpty()) { System.out.println("資料插入異常,請確認資料完整性,稍候重試"); } else { hTable.put(p); } } catch (IOException e) { e.printStackTrace(); } finally { if (hTable != null) { try { hTable.close(); } catch (IOException e) { e.printStackTrace(); } } destroy(); } } /** * 刪除指定名稱的列簇 * * @param admin * @param family * @param tableName */ public static void deleteFamily(String family, String tableName) { try { init(); admin.deleteColumn(TableName.valueOf(tableName), Bytes.toBytes(family)); } catch (IOException e) { e.printStackTrace(); } finally { destroy(); } } /** * 刪除指定行 * * @param conn * @param key * @param tableName */ public static void deleteRow(String key, String tableName) { HTable hTable = null; try { init(); hTable = (HTable) conn.getTable(TableName.valueOf(tableName)); hTable.delete(new Delete(Bytes.toBytes(key))); } catch (IOException e) { e.printStackTrace(); } finally { if (hTable != null) { try { hTable.close(); } catch (IOException e) { e.printStackTrace(); } } destroy(); } } /** * 刪除指定表名 * * @param admin * @param tableName */ public static void deleteTable(String tableName) { try { init(); // 在刪除一張表前,必須先使其失效 admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); } catch (IOException e) { e.printStackTrace(); } finally { destroy(); } } // 關閉連線 public static void destroy() { if (admin != null) { try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } @SuppressWarnings("deprecation") public static void saveClientData(ClientData cd) { String ymd = cd.getTime().toLocaleString().split(" ")[0]; Map<String, String> colVal = new HashMap<String, String>(); colVal.put("s", cd.getScreen()); colVal.put("m", cd.getModel()); colVal.put("c", cd.getCountry()); colVal.put("p", cd.getProvince()); colVal.put("ci", cd.getCity()); colVal.put("n", cd.getNetwork()); colVal.put("t", cd.getTime().toLocaleString()); addMoreRecord("clientdata_test2",cd.getUserID() + "-" + ymd, "d", colVal); } @SuppressWarnings("deprecation") public static void saveClientData3(ClientData cd) { SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss:SSS"); String time = sdf.format(cd.getTime()); String ymd = time.split(" ")[0]; String hms = time.split(" ")[1]; Map<String, String> colVal = new HashMap<String, String>(); colVal.put("s", cd.getScreen()); colVal.put("m", cd.getModel()); colVal.put("c", cd.getCountry()); colVal.put("p", cd.getProvince()); colVal.put("ci", cd.getCity()); colVal.put("n", cd.getNetwork()); colVal.put("t", cd.getTime().toLocaleString()); addOneRecord("clientdata_test3", cd.getUserID() + "-" + ymd,"d", hms, new Gson().toJson(cd)); } public static void main(String[] args) throws IOException { //1、建立表 /*String [] families = {"d"}; createTable("clientdata_test3", families);*/ //2、查詢hive /*List<ClientData> cds = ClientDataDao.listByID("b97da515fb26be121d09eab14363d013"); for(ClientData cd : cds) { String ymd = cd.getTime().toLocaleString().split(" ")[0]; Map<String, String> colVal = new HashMap<String, String>(); colVal.put("s", cd.getScreen()); colVal.put("m", cd.getModel()); colVal.put("c", cd.getCountry()); colVal.put("p", cd.getProvince()); colVal.put("ci", cd.getCity()); colVal.put("n", cd.getNetwork()); colVal.put("t", cd.getTime().toLocaleString()); addMoreRecord("clientdata_test1",cd.getUserID() + "-" + ymd, "d", colVal); }*/ /*ClientData cd1 = new ClientData(); cd1.setCity("shanghai"); cd1.setCountry("china"); cd1.setModel("xiaomi 6"); cd1.setNetwork("wifi"); cd1.setProvince("shanghai"); cd1.setScreen("1920*1080"); cd1.setTime(new Date(2018,3,14,12,01,32)); cd1.setUserID("chenjie"); saveClientData3(cd1); cd1.setNetwork("2G"); cd1.setTime(new Date(2018,3,14,15,00,00)); saveClientData3(cd1); cd1.setNetwork("4G"); cd1.setTime(new Date(2018,3,14,13,36,11)); saveClientData3(cd1); cd1.setNetwork("3G"); cd1.setTime(new Date(2018,3,14,14,00,21)); saveClientData3(cd1);*/ //3、查詢hbase //queryByRowKey("chenjie-3918-04-14", "clientdata_test3"); //4、刪除 //deleteTable("clientdata_test5"); } }
package cn.edu.shu.ces.chenjie.tianyi.hbase.dao.impl; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.util.Bytes; import com.google.gson.Gson; import cn.edu.shu.ces.chenjie.tianyi.hbase.dao.HBaseBaseDao; import cn.edu.shu.ces.chenjie.tianyi.hbase.utils.DateUtil; import cn.edu.shu.ces.chenjie.tianyi.hbase.utils.HbaseDataSourceUtil; import cn.edu.shu.ces.chenjie.tianyi.hive.model.ClientData; public class ClientDataDaoHBaseImpl extends HBaseBaseDao { public static final String TABLE_NAME = "clientdata_test5"; public static List<ClientData> list() { List<ClientData> cds = new ArrayList<ClientData>(); HTable hTable = null; ResultScanner scann = null; try { HbaseDataSourceUtil.init(); hTable = (HTable) HbaseDataSourceUtil.conn.getTable(TableName.valueOf(TABLE_NAME)); scann = hTable.getScanner(new Scan()); for (Result rs : scann) { System.out.println("RowKey為:" + new String(rs.getRow())); // 按cell進行迴圈 for (Cell cell : rs.rawCells()) { System.out.println("列簇為:" + new String(CellUtil.cloneFamily(cell))); System.out.println("列修飾符為:" + new String(CellUtil.cloneQualifier(cell))); String value = new String(CellUtil.cloneValue(cell)); System.out.println("值為:" + value); ClientData cd = new Gson().fromJson(value, ClientData.class); System.out.println(cd); cds.add(cd); } System.out.println("============================================="); } return cds; } catch (IOException e) { e.printStackTrace(); return cds; } finally { if (scann != null) { scann.close(); } if (hTable != null) { try { hTable.close(); } catch (IOException e) { e.printStackTrace(); } } HbaseDataSourceUtil.destroy(); } } public static List<ClientData> list(int page, int pageSize) { // TODO Auto-generated method stub return null; } public static List<ClientData> listByID(String userID) { List<ClientData> cds = new ArrayList<ClientData>(); HTable hTable = null; ResultScanner scann = null; try { HbaseDataSourceUtil.init(); hTable = (HTable) HbaseDataSourceUtil.conn.getTable(TableName.valueOf(TABLE_NAME)); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); Date zero = new Date(0);//unix時間戳起點 String ymd1 = DateUtil.getYMD(zero); String ymd2 = DateUtil.getYMD(new Date());//現在 filterList.addFilter(new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(Bytes.toBytes(userID + "-" + ymd1)))); filterList.addFilter(new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(Bytes.toBytes(userID + "-" + ymd2)))); scann = hTable.getScanner(new Scan().setFilter(filterList)); for (Result rs : scann) { System.out.println("RowKey為:" + new String(rs.getRow())); // 按cell進行迴圈 for (Cell cell : rs.rawCells()) { System.out.println("列簇為:" + new String(CellUtil.cloneFamily(cell))); System.out.println("列修飾符為:" + new String(CellUtil.cloneQualifier(cell))); String value = new String(CellUtil.cloneValue(cell)); System.out.println("值為:" + value); ClientData cd = new Gson().fromJson(value, ClientData.class); //System.out.println(cd); cds.add(cd); } //System.out.println("============================================="); } return cds; } catch (IOException e) { e.printStackTrace(); return cds; } finally { if (scann != null) { scann.close(); } if (hTable != null) { try { hTable.close(); } catch (IOException e) { e.printStackTrace(); } } HbaseDataSourceUtil.destroy(); } } public static List<ClientData> listByIDAndYearMounthDay(String userID, String year, String month, String day) { List<ClientData> cds = new ArrayList<ClientData>(); HTable hTable = null; ResultScanner scann = null; try { HbaseDataSourceUtil.init(); hTable = (HTable) HbaseDataSourceUtil.conn.getTable(TableName.valueOf(TABLE_NAME)); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); String ymd = "" + year + "-" + month + "-" + day; filterList.addFilter(new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(Bytes.toBytes(userID + "-" + ymd)))); filterList.addFilter(new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(Bytes.toBytes(userID + "-" + ymd)))); scann = hTable.getScanner(new Scan().setFilter(filterList)); for (Result rs : scann) { System.out.println("RowKey為:" + new String(rs.getRow())); // 按cell進行迴圈 for (Cell cell : rs.rawCells()) { System.out.println("列簇為:" + new String(CellUtil.cloneFamily(cell))); System.out.println("列修飾符為:" + new String(CellUtil.cloneQualifier(cell))); String value = new String(CellUtil.cloneValue(cell)); System.out.println("值為:" + value); ClientData cd = new Gson().fromJson(value, ClientData.class); //System.out.println(cd); cds.add(cd); } //System.out.println("============================================="); } return cds; } catch (IOException e) { e.printStackTrace(); return cds; } finally { if (scann != null) { scann.close(); } if (hTable != null) { try { hTable.close(); } catch (IOException e) { e.printStackTrace(); } } HbaseDataSourceUtil.destroy(); } } @SuppressWarnings("deprecation") public static void save(ClientData cd) { if(cd == null) return; String ymd = DateUtil.getYMD(cd.getTime()); String hms = DateUtil.getHMSS(cd.getTime()); Map<String, String> colVal = new HashMap<String, String>(); colVal.put("s", cd.getScreen()); colVal.put("m", cd.getModel()); colVal.put("c", cd.getCountry()); colVal.put("p", cd.getProvince()); colVal.put("ci", cd.getCity()); colVal.put("n", cd.getNetwork()); colVal.put("t", cd.getTime().toLocaleString()); HbaseDataSourceUtil.addOneRecord(TABLE_NAME, cd.getUserID() + "-" + ymd,"d", hms, new Gson().toJson(cd)); } @SuppressWarnings("deprecation") public static void saveList(List<ClientData> cds) throws IOException { HTable hTable = null; HbaseDataSourceUtil.init(); hTable = (HTable) HbaseDataSourceUtil.conn.getTable(TableName.valueOf(TABLE_NAME)); for(ClientData cd : cds) { String ymd = DateUtil.getYMD(cd.getTime()); String hms = DateUtil.getHMSS(cd.getTime()); Map<String, String> colVal = new HashMap<String, String>(); colVal.put("s", cd.getScreen()); colVal.put("m", cd.getModel()); colVal.put("c", cd.getCountry()); colVal.put("p", cd.getProvince()); colVal.put("ci", cd.getCity()); colVal.put("n", cd.getNetwork()); colVal.put("t", cd.getTime().toLocaleString()); //HbaseDataSourceUtil.addOneRecord(TABLE_NAME, cd.getUserID() + "-" + ymd,"d", hms, new Gson().toJson(cd)); Put p = new Put(Bytes.toBytes(cd.getUserID() + "-" + ymd)); p.addColumn(Bytes.toBytes("d"), Bytes.toBytes(hms), Bytes.toBytes(new Gson().toJson(cd))); if (p.isEmpty()) { System.out.println("資料插入異常,請確認資料完整性,稍候重試"); } else { hTable.put(p); } } if (hTable != null) { hTable.close(); } HbaseDataSourceUtil.destroy(); } public static void createTable() throws IOException { String [] families = {"d"}; HbaseDataSourceUtil.createTable(TABLE_NAME, families); } public static void main(String[] args) { //ClientDataDaoHBaseImpl.list(); //ClientDataDaoHBaseImpl.listByID("chenjie"); //ClientDataDaoHBaseImpl.listByIDAndYearMounthDay("chenjie","3918","04","14"); } }
