1. 程式人生 > >hadoop讀寫hdfs和操作hbase,把hbase內容按group by排序

hadoop讀寫hdfs和操作hbase,把hbase內容按group by排序


package org.ucas.hbase;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;

public class Hw1Grp2 {
	
	//hbase 表名
	private static final String TABLE_NAME = "Result";
       //列簇名
	private static final String COLMUN_FAMILY = "res";
	private HTable table;
    public HTable getTable() {
		return table;
	}
	public void setTable(HTable table) {
		this.table = table;
	}
	
        public BufferedReader readHdfs(String file) throws IOException, URISyntaxException{
		
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(file), conf);
		Path path = new Path(file);
		FSDataInputStream inStream = fs.open(path);
		BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
		return in;
	}

	public HTable createTable(String tableName) throws MasterNotRunningException,
              ZooKeeperConnectionException, IOException{
            Configuration configuration = HBaseConfiguration.create();
            HBaseAdmin hAdmin = new HBaseAdmin(configuration);
            if(hAdmin.tableExists(tableName)) {
               System.out.println("table is exists, delete exists table");
               hAdmin.disableTable(tableName);
               hAdmin.deleteTable(tableName);
            } else {
               System.out.println("table not exists");
            }
            HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
            HColumnDescriptor cf = new HColumnDescriptor(COLMUN_FAMILY);
            htd.addFamily(cf);
            hAdmin.createTable(htd);
            hAdmin.close();
            System.out.println("table create");
            return new HTable(configuration,tableName);
       }
       public void insert(String rowKey, String family, String qualifier, String value) throws IOException {
		Put put = new Put(rowKey.getBytes());
		put.add(family.getBytes(),qualifier.getBytes(),value.getBytes());
		table.put(put);
       }
	public void handleData(String file, int rowKey, Map<String, Integer> args) throws IOException, URISyntaxException {
		   String colStr = null;
		   BufferedReader buffer = readHdfs(file);
		   
		   //rowKey和count雜湊表
		   Map<String, Integer> mapCount = new HashMap<String, Integer>();
		   
		   //rowKey 的某列sum雜湊表
		   Map<String, Integer> mapSum = new HashMap<String, Integer>();
		   
		   //max雜湊表
		   Map<String, Integer> mapMax = new HashMap<String, Integer>();
		   
		   //avg雜湊表
		   Map<String, Float> mapAvg = new HashMap<String, Float>();
		   
		   //min雜湊表
		   Map<String, Integer> mapMin = new HashMap<String, Integer>();
		   int maxCol = -1, avgCol = -1, sumCol = -1, minCol = -1, countCol = -1;
		   
		   //根據傳進來的引數設定需要進行的聚合函式
		   if(args.containsKey("count")) {
			   countCol = args.get("count");
		   }
		   if(args.containsKey("avg")) {
			   avgCol = args.get("avg");
		   }
		   if(args.containsKey("max")) {
			   maxCol = args.get("max");
		   }
		   if(args.containsKey("sum")) {
			   sumCol = args.get("sum");
		   }
		   if(args.containsKey("min")) {
			   minCol = args.get("min");
		   }
		   //算出需要用到的聚合函式
		   String str;
		   while((str = buffer.readLine()) != null) {
			   String[] col = str.split("\\|");
			   if(mapCount.containsKey(col[rowKey])) {
					 mapCount.put(col[rowKey], mapCount.get(col[rowKey]) +1 );
			   } else {
					 mapCount.put(col[rowKey], 1);
			   }
			   if(sumCol != -1) {
				   if(mapSum.containsKey(col[rowKey])) {
					   mapSum.put(col[rowKey], mapSum.get(col[rowKey]) +Integer.parseInt(col[sumCol]) );
				   } else {
					   mapSum.put(col[rowKey], Integer.parseInt(col[sumCol]));
				   }
			   }
			   if(avgCol != -1) {
				   if(mapAvg.containsKey(col[rowKey])) {
					   mapAvg.put(col[rowKey], mapAvg.get(col[rowKey]) +Float.parseFloat(col[avgCol]) );
				   } else {
					   mapAvg.put(col[rowKey], Float.parseFloat(col[avgCol]));
				   }
			   }
			   if(maxCol != -1) {
				   if(mapMax.containsKey(col[rowKey])) {
					   if(Integer.parseInt(col[maxCol]) > mapMax.get(col[rowKey]))
					      mapMax.put(col[rowKey], Integer.parseInt(col[maxCol]));
				   } else {
					   mapMax.put(col[rowKey], Integer.parseInt(col[maxCol]));
				   }
			   }
			   if(minCol != -1) {
				   if(mapMin.containsKey(col[rowKey])) {
					   if(Integer.parseInt(col[minCol]) < mapMin.get(col[rowKey]))
					      mapMin.put(col[rowKey], Integer.parseInt(col[minCol]));
				   } else {
					   mapMin.put(col[rowKey], Integer.parseInt(col[minCol]));
				   }
			   }
		   }
		   //從hashmap中插入資料表
		   for(String key : mapCount.keySet()) {
                     if(countCol != -1) {
            	     colStr = "count";
            	       insert(key, "res", colStr, mapCount.get(key) + "");
	           }
			   if(avgCol != -1) {
				 colStr = "avg(R" + avgCol + ")";
				 mapAvg.put(key, (float)Math.round(mapAvg.get(key)/mapCount.get(key)*100)/100);
				 insert(key, "res", colStr, mapAvg.get(key) + "");
			   }
			   if(maxCol != -1) {
				 colStr = "max(R" + maxCol + ")";
				 insert(key, "res", colStr, mapMax.get(key) + "");
			   }
			   if(minCol != -1) {
				 colStr = "min(R" + minCol + ")";
				 insert(key, "res", colStr, mapMin.get(key) + "");
			   }
			   if(sumCol != -1) {
				 colStr = "sum(R" + sumCol + ")";
				 insert(key, "res", colStr, mapSum.get(key) + "");
			   }
		   }
		   System.out.println("handle data success");
	}
	public static void main(String[] args) throws IOException, URISyntaxException {
		/**
		 * 命令引數解析,解析出檔名,group by的列,需要求的聚合函式
		 */
		if(args.length != 3) {
			System.out.println("input args length error");
			System.exit(0);
		}
		String file = StringUtils.substringAfter(args[0], "=");
		if(file == null) {
			System.out.println("args error");
			System.exit(0);
		}
		String keyNum = StringUtils.substringAfter(args[1], "R");
		if(keyNum  == null) {
			System.out.println("args error");
			System.exit(0);
		}
		int rowKey = Integer.parseInt(keyNum);
		
		String colsName = StringUtils.substringAfter(args[2], ":");
		if(colsName == null) {
			System.out.println("args error");
			System.exit(0);
		}
		String[] cmdStr = colsName.split(",");
		Map<String, Integer> cmd = new HashMap<String, Integer>();
		for(int i = 0; i < cmdStr.length; i++) {
			if(!cmdStr[i].equals("count")) {
			    cmd.put(StringUtils.substringBefore(cmdStr[i], "("), Integer.parseInt(StringUtils.substringBetween(cmdStr[i],"R", ")")));
			} else {
				cmd.put(cmdStr[i], rowKey);
			}
		}
		System.out.println("file:" + file);
		for(String key : cmd.keySet()) {
			System.out.println(key + ":" + cmd.get(key));
		}
		Hw1Grp2 h = new Hw1Grp2();
		h.setTable(h.createTable(TABLE_NAME));
		h.handleData(file, rowKey, cmd);
		System.out.println("program is over");
	}
}