1. 程式人生 > >hbase0.98 endpoint實現group分組求和程式碼

hbase0.98 endpoint實現group分組求和程式碼



public class GroupClient {


public static void main(String[] args) throws ServiceException, Throwable {

Winutils.createWinutils();
// TODO Auto-generated method stub
System.out.println("begin.....");
long begin_time = System.currentTimeMillis();
Configuration config = HBaseConfiguration.create();
// String master_ip="192.168.150.128";
//String master_ip = args[0];
//String zk_ip = args[1];
String table_name = "xun_traffic";
//config.set("hbase.zookeeper.property.clientPort", "2181");
//config.set("hbase.zookeeper.quorum", zk_ip);
//config.set("hbase.master", master_ip + ":600000");



HTable table = new HTable(config, table_name);


Map<byte[], String> results = table.coprocessorService(GroupService.class, null, null,
new Batch.Call<GroupProtos.GroupService, String>() {

public String call(GroupProtos.GroupService counter)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GroupProtos.GroupResponse> rpcCallback = new BlockingRpcCallback<GroupProtos.GroupResponse>();
 
GroupProtos.GroupRequest request = GroupProtos.GroupRequest.getDefaultInstance();
 
  
Builder newBuilder = request.newBuilder();
 
List list = new ArrayList();
list.add("id");
list.add("date");
 list.add("f");
newBuilder.addAllParams(list);

GroupRequest build = newBuilder.build();
byte[] info = build.toByteArray();
GroupRequest parseFrom = request.parseFrom(info);
 

counter.getCountByGroup(controller, parseFrom,rpcCallback);

GroupProtos.GroupResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return (response != null && response.hasKeyValue()) ? response.getKeyValue() : "";
}

});
table.close();


if (results.size() > 0) {
//System.out.println("results==="+results.values());
System.out.println("results size==="+results.size());

 Map<String, Long> combin = new HashMap<String, Long>();
int i = 0;
 
for(Entry<byte[], String> entry : results.entrySet()){


//System.out.println("====="+new String(entry.getKey())+":"+entry.getValue());

ObjectMapper objectMapper = new ObjectMapper();
   Map<String, Long> readValue = objectMapper.readValue(entry.getValue(),Map.class);

   if (i == 0) {
                    combin.putAll(readValue);
                } else {
                    putAll(combin, readValue);
                }
                i++;
}

//打印出分組求和的結果
System.out.println("map====="+combin);

} else {
System.out.println("沒有任何返回結果");
}


long end_time = System.currentTimeMillis();
System.out.println("end:" + (end_time - begin_time));

}
//因為返回是每個region上的資料,所以要將所有的region合併
public static void putAll(Map<String, Long> combin, Map<String, Long> map2) {
       Set<Map.Entry<String, Long>> set2 = map2.entrySet();


       for (Iterator<Map.Entry<String, Long>> it2 = set2.iterator(); it2
               .hasNext();) {
           Map.Entry<String, Long> entry2 = it2.next();
           String key = entry2.getKey();
       
//           Long value = entry2.getValue();
           Long value = Long.parseLong(entry2.getValue()+"");
           


           Set<Map.Entry<String, Long>> set = combin.entrySet();


           boolean flag = false;
           for (Iterator<Map.Entry<String, Long>> it = set.iterator(); it
                   .hasNext();) {
               Map.Entry<String, Long> entry = it.next();


               String key2 = entry.getKey();
               Long value2 = Long.parseLong(entry.getValue()+"");


               if (key.equals(key2)) {
                   value = value + value2;
                   combin.put(key, value);
                   flag = true;
               }


           }
           if (!flag) {
               combin.put(key, value);
           }
       }
   }


}