hadoop讀寫hdfs和操作hbase,把hbase內容按group by排序
阿新 • • 發佈:2019-02-05
package org.ucas.hbase; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; public class Hw1Grp2 { //hbase 表名 private static final String TABLE_NAME = "Result"; //列簇名 private static final String COLMUN_FAMILY = "res"; private HTable table; public HTable getTable() { return table; } public void setTable(HTable table) { this.table = table; } public BufferedReader readHdfs(String file) throws IOException, URISyntaxException{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(file), conf); Path path = new Path(file); FSDataInputStream inStream = fs.open(path); BufferedReader in = new BufferedReader(new InputStreamReader(inStream)); return in; } public HTable createTable(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ Configuration configuration = HBaseConfiguration.create(); HBaseAdmin hAdmin = new HBaseAdmin(configuration); if(hAdmin.tableExists(tableName)) { System.out.println("table is exists, delete exists table"); hAdmin.disableTable(tableName); hAdmin.deleteTable(tableName); } else { System.out.println("table not exists"); } HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor cf = new HColumnDescriptor(COLMUN_FAMILY); htd.addFamily(cf); hAdmin.createTable(htd); hAdmin.close(); System.out.println("table create"); return new HTable(configuration,tableName); } public void insert(String rowKey, String family, String qualifier, String value) throws IOException { Put put = new Put(rowKey.getBytes()); put.add(family.getBytes(),qualifier.getBytes(),value.getBytes()); table.put(put); } public void handleData(String file, int rowKey, Map<String, Integer> args) throws IOException, URISyntaxException { String colStr = null; BufferedReader buffer = readHdfs(file); //rowKey和count雜湊表 Map<String, Integer> mapCount = new HashMap<String, Integer>(); //rowKey 的某列sum雜湊表 Map<String, Integer> mapSum = new HashMap<String, Integer>(); //max雜湊表 Map<String, Integer> mapMax = new HashMap<String, Integer>(); //avg雜湊表 Map<String, Float> mapAvg = new HashMap<String, Float>(); //min雜湊表 Map<String, Integer> mapMin = new HashMap<String, Integer>(); int maxCol = -1, avgCol = -1, sumCol = -1, minCol = -1, countCol = -1; //根據傳進來的引數設定需要進行的聚合函式 if(args.containsKey("count")) { countCol = args.get("count"); } if(args.containsKey("avg")) { avgCol = args.get("avg"); } if(args.containsKey("max")) { maxCol = args.get("max"); } if(args.containsKey("sum")) { sumCol = args.get("sum"); } if(args.containsKey("min")) { minCol = args.get("min"); } //算出需要用到的聚合函式 String str; while((str = buffer.readLine()) != null) { String[] col = str.split("\\|"); if(mapCount.containsKey(col[rowKey])) { mapCount.put(col[rowKey], mapCount.get(col[rowKey]) +1 ); } else { mapCount.put(col[rowKey], 1); } if(sumCol != -1) { if(mapSum.containsKey(col[rowKey])) { mapSum.put(col[rowKey], mapSum.get(col[rowKey]) +Integer.parseInt(col[sumCol]) ); } else { mapSum.put(col[rowKey], Integer.parseInt(col[sumCol])); } } if(avgCol != -1) { if(mapAvg.containsKey(col[rowKey])) { mapAvg.put(col[rowKey], mapAvg.get(col[rowKey]) +Float.parseFloat(col[avgCol]) ); } else { mapAvg.put(col[rowKey], Float.parseFloat(col[avgCol])); } } if(maxCol != -1) { if(mapMax.containsKey(col[rowKey])) { if(Integer.parseInt(col[maxCol]) > mapMax.get(col[rowKey])) mapMax.put(col[rowKey], Integer.parseInt(col[maxCol])); } else { mapMax.put(col[rowKey], Integer.parseInt(col[maxCol])); } } if(minCol != -1) { if(mapMin.containsKey(col[rowKey])) { if(Integer.parseInt(col[minCol]) < mapMin.get(col[rowKey])) mapMin.put(col[rowKey], Integer.parseInt(col[minCol])); } else { mapMin.put(col[rowKey], Integer.parseInt(col[minCol])); } } } //從hashmap中插入資料表 for(String key : mapCount.keySet()) { if(countCol != -1) { colStr = "count"; insert(key, "res", colStr, mapCount.get(key) + ""); } if(avgCol != -1) { colStr = "avg(R" + avgCol + ")"; mapAvg.put(key, (float)Math.round(mapAvg.get(key)/mapCount.get(key)*100)/100); insert(key, "res", colStr, mapAvg.get(key) + ""); } if(maxCol != -1) { colStr = "max(R" + maxCol + ")"; insert(key, "res", colStr, mapMax.get(key) + ""); } if(minCol != -1) { colStr = "min(R" + minCol + ")"; insert(key, "res", colStr, mapMin.get(key) + ""); } if(sumCol != -1) { colStr = "sum(R" + sumCol + ")"; insert(key, "res", colStr, mapSum.get(key) + ""); } } System.out.println("handle data success"); } public static void main(String[] args) throws IOException, URISyntaxException { /** * 命令引數解析,解析出檔名,group by的列,需要求的聚合函式 */ if(args.length != 3) { System.out.println("input args length error"); System.exit(0); } String file = StringUtils.substringAfter(args[0], "="); if(file == null) { System.out.println("args error"); System.exit(0); } String keyNum = StringUtils.substringAfter(args[1], "R"); if(keyNum == null) { System.out.println("args error"); System.exit(0); } int rowKey = Integer.parseInt(keyNum); String colsName = StringUtils.substringAfter(args[2], ":"); if(colsName == null) { System.out.println("args error"); System.exit(0); } String[] cmdStr = colsName.split(","); Map<String, Integer> cmd = new HashMap<String, Integer>(); for(int i = 0; i < cmdStr.length; i++) { if(!cmdStr[i].equals("count")) { cmd.put(StringUtils.substringBefore(cmdStr[i], "("), Integer.parseInt(StringUtils.substringBetween(cmdStr[i],"R", ")"))); } else { cmd.put(cmdStr[i], rowKey); } } System.out.println("file:" + file); for(String key : cmd.keySet()) { System.out.println(key + ":" + cmd.get(key)); } Hw1Grp2 h = new Hw1Grp2(); h.setTable(h.createTable(TABLE_NAME)); h.handleData(file, rowKey, cmd); System.out.println("program is over"); } }