hbase0.98 endpoint實現group分組求和程式碼
阿新 • • 發佈:2018-12-31
public class GroupClient {
public static void main(String[] args) throws ServiceException, Throwable {
Winutils.createWinutils();
// TODO Auto-generated method stub
System.out.println("begin.....");
long begin_time = System.currentTimeMillis();
Configuration config = HBaseConfiguration.create();
// String master_ip="192.168.150.128";
//String master_ip = args[0];
//String zk_ip = args[1];
String table_name = "xun_traffic";
//config.set("hbase.zookeeper.property.clientPort", "2181");
//config.set("hbase.zookeeper.quorum", zk_ip);
//config.set("hbase.master", master_ip + ":600000");
HTable table = new HTable(config, table_name);
Map<byte[], String> results = table.coprocessorService(GroupService.class, null, null,
new Batch.Call<GroupProtos.GroupService, String>() {
public String call(GroupProtos.GroupService counter)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GroupProtos.GroupResponse> rpcCallback = new BlockingRpcCallback<GroupProtos.GroupResponse>();
GroupProtos.GroupRequest request = GroupProtos.GroupRequest.getDefaultInstance();
Builder newBuilder = request.newBuilder();
List list = new ArrayList();
list.add("id");
list.add("date");
list.add("f");
newBuilder.addAllParams(list);
GroupRequest build = newBuilder.build();
byte[] info = build.toByteArray();
GroupRequest parseFrom = request.parseFrom(info);
counter.getCountByGroup(controller, parseFrom,rpcCallback);
GroupProtos.GroupResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return (response != null && response.hasKeyValue()) ? response.getKeyValue() : "";
}
});
table.close();
if (results.size() > 0) {
//System.out.println("results==="+results.values());
System.out.println("results size==="+results.size());
Map<String, Long> combin = new HashMap<String, Long>();
int i = 0;
for(Entry<byte[], String> entry : results.entrySet()){
//System.out.println("====="+new String(entry.getKey())+":"+entry.getValue());
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Long> readValue = objectMapper.readValue(entry.getValue(),Map.class);
if (i == 0) {
combin.putAll(readValue);
} else {
putAll(combin, readValue);
}
i++;
}
//打印出分組求和的結果
System.out.println("map====="+combin);
} else {
System.out.println("沒有任何返回結果");
}
long end_time = System.currentTimeMillis();
System.out.println("end:" + (end_time - begin_time));
}
//因為返回是每個region上的資料,所以要將所有的region合併
public static void putAll(Map<String, Long> combin, Map<String, Long> map2) {
Set<Map.Entry<String, Long>> set2 = map2.entrySet();
for (Iterator<Map.Entry<String, Long>> it2 = set2.iterator(); it2
.hasNext();) {
Map.Entry<String, Long> entry2 = it2.next();
String key = entry2.getKey();
// Long value = entry2.getValue();
Long value = Long.parseLong(entry2.getValue()+"");
Set<Map.Entry<String, Long>> set = combin.entrySet();
boolean flag = false;
for (Iterator<Map.Entry<String, Long>> it = set.iterator(); it
.hasNext();) {
Map.Entry<String, Long> entry = it.next();
String key2 = entry.getKey();
Long value2 = Long.parseLong(entry.getValue()+"");
if (key.equals(key2)) {
value = value + value2;
combin.put(key, value);
flag = true;
}
}
if (!flag) {
combin.put(key, value);
}
}
}
}