java實現Presto線上查詢Hive
阿新 • • 發佈:2018-11-28
@Override public String queryHiveData(String sql, int page, int pageSize, String jobID, long startTime) { SQLQueryModel sqlQueryModel = new SQLQueryModel(); if (!sql.equals(sqlUtil.sqlEmptyCheck(sqlQueryModel, sql, startTime))) { return sqlUtil.sqlEmptyCheck(sqlQueryModel, sql, startTime); } // 新增資料庫狀態 onLineQueryMapper.addQueryLog(queryLogTableUtil.addQueryLog(jobID, jobID, configUtil.readConfig("presto.hive.online.name"), configUtil.readConfig("search.engine.presto"), Constant.OnLineType, sql, "", "", dateTimeUtil.timeStampToDateTime(String.valueOf(startTime)), "", "", "", configUtil.readConfig("job.running"))); Connection connection = null; Statement stmt = null; ResultSet resultSet = null; boolean flag = true; int colCount = 0; List<SQLQuerySubColModel> columnsList = new ArrayList<>(); List<String> columns = new ArrayList<>(); List<List<String>> data = new ArrayList<>(); try { Class.forName(Constant.PrestoDriver); connection = DriverManager.getConnection( configUtil.readConfig("presto.hive.url"), configUtil.readConfig("presto.hive.username"), null); // 新增連線物件 ConnectUtil.connectionMap.put(jobID, connection); stmt = connection.createStatement(); resultSet = stmt.executeQuery(sql); while (resultSet.next()) { if (flag) { ResultSetMetaData metaData = resultSet.getMetaData(); colCount = metaData.getColumnCount(); for (int i = 1; i <= colCount; i++) { columns.add(metaData.getColumnLabel(i)); // 初始化欄位列表 SQLQuerySubColModel sqlQuerySubColModel = new SQLQuerySubColModel(); sqlQuerySubColModel.setKey(String.valueOf(i)); sqlQuerySubColModel.setName(metaData.getColumnLabel(i)); columnsList.add(sqlQuerySubColModel); } flag = false; } List<String> list = new ArrayList<>(); for (int j = 0; j < colCount; j++) { list.add(String.valueOf(resultSet.getObject(j + 1))); } data.add(list); } // 儲存查詢結果資料 queryResultDataServiceImp.StoreSearchResult(columns, data, jobID); } catch (Exception error) { LOG.error(error.getMessage()); onLineQueryMapper.updateQueryLog(queryLogTableUtil.updateQueryLog(jobID, "", "", dateTimeUtil.timeStampToDateTime(dateTimeUtil.currentTimeStamp()), String.valueOf(Long.parseLong(dateTimeUtil.currentTimeStamp()) - startTime), error.getMessage(), configUtil.readConfig("job.failed"))); // 移除已結束的連線物件 ConnectUtil.reConnectObj(ConnectUtil.connectionMap, jobID); sqlQueryModel.setDataList(null); sqlQueryModel.setRunTime(Long.parseLong(dateTimeUtil.currentTimeStamp()) - startTime); sqlQueryModel.setWords(null); if (error.getMessage().length() > 100) { sqlQueryModel.setErrMsg(error.getMessage().substring(0, 100)); } else { sqlQueryModel.setErrMsg(error.getMessage()); } sqlQueryModel.setJobStatus("Failed"); return returnResultModel.generateResult( Integer.parseInt(configUtil.readConfig("return.success")), configUtil.readConfig("return.msg.success"), sqlQueryModel); } finally { try { if (stmt != null) { stmt.close(); } if (resultSet != null) { resultSet.close(); } if (connection != null) { connection.close(); } // 移除已結束的連線物件 ConnectUtil.reConnectObj(ConnectUtil.connectionMap, jobID); } catch (Exception error) { LOG.error(error.getMessage()); onLineQueryMapper.updateQueryLog(queryLogTableUtil.updateQueryLog(jobID, "", "", dateTimeUtil.timeStampToDateTime(dateTimeUtil.currentTimeStamp()), String.valueOf(Long.parseLong(dateTimeUtil.currentTimeStamp()) - startTime), error.getMessage(), configUtil.readConfig("job.failed"))); sqlQueryModel.setDataList(null); sqlQueryModel.setRunTime(Long.parseLong(dateTimeUtil.currentTimeStamp()) - startTime); sqlQueryModel.setWords(null); if (error.getMessage().length() > 100) { sqlQueryModel.setErrMsg(error.getMessage().substring(0, 100)); } else { sqlQueryModel.setErrMsg(error.getMessage()); } sqlQueryModel.setJobStatus("Failed"); // 移除已結束的連線物件 ConnectUtil.reConnectObj(ConnectUtil.connectionMap, jobID); return returnResultModel.generateResult( Integer.parseInt(configUtil.readConfig("return.success")), configUtil.readConfig("return.msg.success"), sqlQueryModel); } } // 對返回資料的分頁操作 int startIndex = page * pageSize - pageSize; int endIndex = page * pageSize; int flagCount = data.size(); if (data.size() < (page * pageSize)) { flagCount = Math.abs((data.size() + pageSize) - (page * pageSize)); } SQLQuerySubModel dataList = new SQLQuerySubModel(); dataList.setTotal(data.size()); List<Map<String, Object>> rows = new ArrayList<>(); Map<String, Integer> colLenList = Maps.newHashMap(); if (!data.isEmpty()) { int count = 0; for (int index = startIndex; index < endIndex; index++) { if (count == flagCount) break; Map<String, Object> row = Maps.newHashMap(); row.put("key", index); for (int i = 0; i < colCount; i++) { row.put(columns.get(i), data.get(index).get(i)); // 計算每個欄位的最大值長度 if (startIndex == count) { colLenList.put(columns.get(i), data.get(index).get(i).length()); } else { LOG.info(""); if (data.get(index).get(i).length() > colLenList.get(columns.get(i))) { colLenList.put(columns.get(i), data.get(index).get(i).length()); } } } rows.add(row); count += 1; } dataList.setRows(rows); } sqlQueryModel.setDataList(dataList); for (SQLQuerySubColModel tempCol : columnsList) { tempCol.setLen(colLenList.get(tempCol.getName())); } sqlQueryModel.setWords(columnsList); sqlQueryModel.setRunTime(Long.parseLong(dateTimeUtil.currentTimeStamp()) - startTime); sqlQueryModel.setErrMsg(""); sqlQueryModel.setJobStatus("Success"); DateTimeUtil dateTimeUtil = new DateTimeUtil(); String timeStamp = jobID.split("_")[0]; String dateTime = dateTimeUtil.timeStampToDateTime(timeStamp).substring(0, 10); // 更新資料庫狀態 onLineQueryMapper.updateQueryLog(queryLogTableUtil.updateQueryLog(jobID, configUtil.readConfig("download.file.path") + dateTime + "/" + jobID + ".csv", "", dateTimeUtil.timeStampToDateTime(dateTimeUtil.currentTimeStamp()), String.valueOf(Long.parseLong(dateTimeUtil.currentTimeStamp()) - startTime), "", configUtil.readConfig("job.success"))); return returnResultModel.generateResult(Integer.parseInt(configUtil.readConfig("return.success")), configUtil.readConfig("return.msg.success"), sqlQueryModel); }