HIVE數倉資料血緣分析工具-SQL解析
阿新 • • 發佈:2019-01-04
一、數倉經常會碰到的幾類問題:
1、兩個資料報表進行對比,結果差異很大,需要人工核對分析指標的維度資訊,比如從頭分析資料指標從哪裡來,處理條件是什麼,最後才能分析出問題原因。
2、基礎資料表因某種原因需要修改欄位,需要評估其對數倉的影響,費時費力,然後在做方案。
二、問題分析:
資料來源長途跋涉,經過大量的處理和元件來傳遞,呈現在業務使用者面前,對資料進行回溯其實很難。元資料回溯在有效決策、策略制定、差異分析等過程中很重要。這兩類問題都屬於資料血緣分析問題,第一類叫做資料回溯、第二類叫做影響分析,是資料回溯的逆向。
三、解決方法:
自己實現了一套基於hive數倉的資料血緣分析工具,來完成各個資料表、欄位之間的關係梳理,進而解決上面兩個問題。
- 工具主要目標:解析計算指令碼中的HQL語句,分析得到輸入輸出表、輸入輸出欄位和相應的處理條件,進行分析展現。
- 實現思路:對AST深度優先遍歷,遇到操作的token則判斷當前的操作,遇到子句則壓棧當前處理,處理子句。子句處理完,棧彈出。處理字句的過程中,遇到子查詢就儲存當前子查詢的資訊,判斷與其父查詢的關係,最終形成樹形結構; 遇到欄位或者條件處理則記錄當前的欄位和條件資訊、組成Block,巢狀呼叫。
- 關鍵點解析:
1、遇到TOK_TAB或TOK_TABREF則判斷出當前操作的表
2、壓棧判斷是否是join,判斷join條件
3、定義資料結構Block,遇到在where\select\join時獲得其下相應的欄位和條件,組成Block
4、定義資料結構ColLine,遇到TOK_SUBQUERY儲存當前的子查詢資訊,供父查詢使用
5、定義資料結構ColLine,遇到TOK_UNION結束時,合併並截斷當前的列資訊
6、遇到select 或者未明確指出的欄位,查詢元資料進行輔助分析
7、解析結果進行相關校驗
程式碼如下:
Block類
package com.xiaoju.products.parse;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.Map.Entry;
import java.util.LinkedHashSet;
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import com.xiaoju.products.bean.Block;
import com.xiaoju.products.bean.ColLine;
import com.xiaoju.products.bean.QueryTree;
import com.xiaoju.products.exception.SQLParseException;
import com.xiaoju.products.exception.UnSupportedException;
import com.xiaoju.products.util.Check;
import com.xiaoju.products.util.MetaCache;
import com.xiaoju.products.util.NumberUtil;
import com.xiaoju.products.util.ParseUtil;
import com.xiaoju.products.util.PropertyFileUtil;
/**
* hive sql解析類
*
* 目的:實現HQL的語句解析,分析出輸入輸出表、欄位和相應的處理條件。為欄位級別的資料血緣提供基礎。
* 重點:獲取SELECT操作中的表和列的相關操作。其他操作這判斷到欄位級別。
* 實現思路:對AST深度優先遍歷,遇到操作的token則判斷當前的操作,遇到子句則壓棧當前處理,處理子句。子句處理完,棧彈出。
* 處理字句的過程中,遇到子查詢就儲存當前子查詢的資訊,判斷與其父查詢的關係,最終形成樹形結構;
* 遇到欄位或者條件處理則記錄當前的欄位和條件資訊、組成Block,巢狀呼叫。
* 關鍵點解析
* 1、遇到TOK_TAB或TOK_TABREF則判斷出當前操作的表
* 2、壓棧判斷是否是join,判斷join條件
* 3、定義資料結構Block,遇到在where\select\join時獲得其下相應的欄位和條件,組成Block
* 4、定義資料結構ColLine,遇到TOK_SUBQUERY儲存當前的子查詢資訊,供父查詢使用
* 5、定義資料結構ColLine,遇到TOK_UNION結束時,合併並截斷當前的列資訊
* 6、遇到select * 或者未明確指出的欄位,查詢元資料進行輔助分析
* 7、解析結果進行相關校驗
* 試用範圍:
* 1、支援標準SQL
* 2、不支援transform using script
*
* @author yangyangthomas
*
*/
public class LineParser {
private static final String SPLIT_DOT = ".";
private static final String SPLIT_COMMA = ",";
private static final String SPLIT_AND = "&";
private static final String TOK_EOF = "<EOF>";
private static final String CON_WHERE = "WHERE:";
private static final String TOK_TMP_FILE = "TOK_TMP_FILE";
private Map<String /*table*/, List<String/*column*/>> dbMap = new HashMap<String, List<String>>();
private List<QueryTree> queryTreeList = new ArrayList<QueryTree>(); //子查詢樹形關係儲存
private Stack<Set<String>> conditionsStack = new Stack<Set<String>>();
private Stack<List<ColLine>> colsStack = new Stack<List<ColLine>>();
private Map<String, List<ColLine>> resultQueryMap = new HashMap<String, List<ColLine>>();
private Set<String> conditions = new HashSet<String>(); //where or join 條件快取
private List<ColLine> cols = new ArrayList<ColLine>(); //一個子查詢內的列快取
private Stack<String> tableNameStack = new Stack<String>();
private Stack<Boolean> joinStack = new Stack<Boolean>();
private Stack<ASTNode> joinOnStack = new Stack<ASTNode>();
private Map<String, QueryTree> queryMap = new HashMap<String, QueryTree>();
private boolean joinClause = false;
private ASTNode joinOn = null;
private String nowQueryDB = "default"; //hive的預設庫
private boolean isCreateTable = false;
//結果
private List<ColLine> colLines = new ArrayList<ColLine>();
private Set<String> outputTables = new HashSet<String>();
private Set<String> inputTables = new HashSet<String>();
private List<ColLine> tmpColLines = new ArrayList<ColLine>();
private Set<String> tmpOutputTables = new HashSet<String>();
private Set<String> tmpInputTables = new HashSet<String>();
public List<ColLine> getColLines() {
return colLines;
}
public Set<String> getOutputTables() {
return outputTables;
}
public Set<String> getInputTables() {
return inputTables;
}
private void parseIteral(ASTNode ast) {
prepareToParseCurrentNodeAndChilds(ast);
parseChildNodes(ast);
parseCurrentNode(ast);
endParseCurrentNode(ast);
}
/**
* 解析當前節點
* @param ast
* @param set
* @return
*/
private void parseCurrentNode(ASTNode ast){
if (ast.getToken() != null) {
switch (ast.getToken().getType()) {
case HiveParser.TOK_CREATETABLE: //outputtable
isCreateTable = true;
String tableOut = fillDB(BaseSemanticAnalyzer.getUnescapedName((ASTNode) ast.getChild(0)));
tmpOutputTables.add(tableOut);
MetaCache.getInstance().init(tableOut); //初始化資料,供以後使用
break;
case HiveParser.TOK_TAB:// outputTable
String tableTab = BaseSemanticAnalyzer.getUnescapedName((ASTNode) ast.getChild(0));
String tableOut2 = fillDB(tableTab);
tmpOutputTables.add(tableOut2);
MetaCache.getInstance().init(tableOut2); //初始化資料,供以後使用
break;
case HiveParser.TOK_TABREF:// inputTable
ASTNode tabTree = (ASTNode) ast.getChild(0);
String tableInFull = fillDB((tabTree.getChildCount() == 1) ?
BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0))
: BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0))
+ SPLIT_DOT + BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(1))
);
String tableIn = tableInFull.substring(tableInFull.indexOf(SPLIT_DOT) + 1);
tmpInputTables.add(tableInFull);
MetaCache.getInstance().init(tableInFull); //初始化資料,供以後使用
queryMap.clear();
String alia = null;
if (ast.getChild(1) != null) { //(TOK_TABREF (TOK_TABNAME detail usersequence_client) c)
alia = ast.getChild(1).getText().toLowerCase();
QueryTree qt = new QueryTree();
qt.setCurrent(alia);
qt.getTableSet().add(tableInFull);
QueryTree pTree = getSubQueryParent(ast);
qt.setpId(pTree.getpId());
qt.setParent(pTree.getParent());
queryTreeList.add(qt);
if (joinClause && ast.getParent() == joinOn) { // TOK_SUBQUERY join TOK_TABREF ,此處的TOK_SUBQUERY資訊不應該清楚
for (QueryTree entry : queryTreeList) { //當前的查詢範圍
if (qt.getParent().equals(entry.getParent())) {
queryMap.put(entry.getCurrent(), entry);
}
}
} else {
queryMap.put(qt.getCurrent(), qt);
}
} else {
alia = tableIn.toLowerCase();
QueryTree qt = new QueryTree();
qt.setCurrent(alia);
qt.getTableSet().add(tableInFull);
QueryTree pTree = getSubQueryParent(ast);
qt.setpId(pTree.getpId());
qt.setParent(pTree.getParent());
queryTreeList.add(qt);
if (joinClause && ast.getParent() == joinOn) {
for (QueryTree entry : queryTreeList) {
if (qt.getParent().equals(entry.getParent())) {
queryMap.put(entry.getCurrent(), entry);
}
}
} else {
queryMap.put(qt.getCurrent(), qt);
//此處檢查查詢 select app.t1.c1,t1.c1 from t1 的情況
queryMap.put(tableInFull.toLowerCase(), qt);
}
}
break;
case HiveParser.TOK_SUBQUERY:
if (ast.getChildCount() == 2) {
String tableAlias = BaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(1).getText());
String aliaReal = "";
if(aliaReal.length() !=0){
aliaReal = aliaReal.substring(0, aliaReal.length()-1);
}
QueryTree qt = new QueryTree();
qt.setCurrent(tableAlias.toLowerCase());
qt.setColLineList(generateColLineList(cols, conditions));
QueryTree pTree = getSubQueryParent(ast);
qt.setId(generateTreeId(ast));
qt.setpId(pTree.getpId());
qt.setParent(pTree.getParent());
qt.setChildList(getSubQueryChilds(qt.getId()));
if (Check.notEmpty(qt.getChildList())) {
for (QueryTree cqt : qt.getChildList()) {
qt.getTableSet().addAll(cqt.getTableSet());
queryTreeList.remove(cqt); // 移除子節點資訊
}
}
queryTreeList.add(qt);
cols.clear();
queryMap.clear();
for (QueryTree _qt : queryTreeList) {
if (qt.getParent().equals( _qt.getParent())) { //當前子查詢才儲存
queryMap.put(_qt.getCurrent(), _qt);
}
}
}
break;
case HiveParser.TOK_SELEXPR: //輸入輸出欄位的處理
/**
* (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
* (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))
*
* (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
* (TOK_SELECT
* (TOK_SELEXPR (. (TOK_TABLE_OR_COL p) datekey) datekey)
* (TOK_SELEXPR (TOK_TABLE_OR_COL datekey))
* (TOK_SELEXPR (TOK_FUNCTIONDI count (. (TOK_TABLE_OR_COL base) userid)) buyer_count))
* (TOK_SELEXPR (TOK_FUNCTION when (> (. (TOK_TABLE_OR_COL base) userid) 5) (. (TOK_TABLE_OR_COL base) clienttype) (> (. (TOK_TABLE_OR_COL base) userid) 1) (+ (. (TOK_TABLE_OR_COL base) datekey) 5) (+ (. (TOK_TABLE_OR_COL base) clienttype) 1)) bbbaaa)
*/
//解析需要插入的表
Tree tok_insert = ast.getParent().getParent();
Tree child = tok_insert.getChild(0).getChild(0);
String tName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) child.getChild(0));
String destTable = TOK_TMP_FILE.equals(tName) ? TOK_TMP_FILE : fillDB(tName);
//select a.*,* from t1 和 select * from (select c1 as a,c2 from t1) t 的情況
if (ast.getChild(0).getType() == HiveParser.TOK_ALLCOLREF) {
String tableOrAlias = "";
if (ast.getChild(0).getChild(0) != null) {
tableOrAlias = ast.getChild(0).getChild(0).getChild(0).getText();
}
String[] result = getTableAndAlia(tableOrAlias);
String _alia = result[1];
boolean isSub = false; //處理巢狀select * 的情況
if (Check.notEmpty(_alia)) {
for (String string : _alia.split(SPLIT_AND)) { //迭代迴圈的時候查詢
QueryTree qt = queryMap.get(string.toLowerCase());
if (null != qt) {
List<ColLine> colLineList = qt.getColLineList();
if (Check.notEmpty(colLineList)) {
isSub = true;
for (ColLine colLine : colLineList) {
cols.add(colLine);
}
}
}
}
}
if (!isSub) { //處理直接select * 的情況
String nowTable = result[0];
String[] tableArr = nowTable.split(SPLIT_AND); //fact.test&test2
for (String tables : tableArr) {
String[] split = tables.split("\\.");
if (split.length > 2) {
throw new SQLParseException("parse table:" + nowTable);
}
List<String> colByTab = MetaCache.getInstance().getColumnByDBAndTable(tables);
for (String column : colByTab) {
Set<String> fromNameSet = new LinkedHashSet<String>();
fromNameSet.add(tables + SPLIT_DOT + column);
ColLine cl = new ColLine(column, tables + SPLIT_DOT + column, fromNameSet,
new LinkedHashSet<String>() , destTable, column);
cols.add(cl);
}
}
}
} else {
Block bk = getBlockIteral((ASTNode)ast.getChild(0));
String toNameParse = getToNameParse(ast, bk);
Set<String> fromNameSet = filterData(bk.getColSet());
ColLine cl = new ColLine(toNameParse, bk.getCondition(), fromNameSet, new LinkedHashSet<String>() , destTable, "");
cols.add(cl);
}
break;
case HiveParser.TOK_WHERE: //3、過濾條件的處理select類
conditions.add(CON_WHERE + getBlockIteral((ASTNode) ast.getChild(0)).getCondition());
break;
default:
/**
* (or
* (> (. (TOK_TABLE_OR_COL p) orderid) (. (TOK_TABLE_OR_COL c) orderid))
* (and (= (. (TOK_TABLE_OR_COL p) a) (. (TOK_TABLE_OR_COL c) b))
* (= (. (TOK_TABLE_OR_COL p) aaa) (. (TOK_TABLE_OR_COL c) bbb))))
*/
//1、過濾條件的處理join類
if (joinOn != null && joinOn.getTokenStartIndex() == ast.getTokenStartIndex()
&& joinOn.getTokenStopIndex() == ast.getTokenStopIndex()) {
ASTNode astCon = (ASTNode)ast.getChild(2);
conditions.add(ast.getText().substring(4) + ":" + getBlockIteral(astCon).getCondition());
break;
}
}
}
}
/**
* 查詢當前節點的父子查詢節點
* @param ast
*/
private QueryTree getSubQueryParent(Tree ast) {
Tree _tree = ast;
QueryTree qt = new QueryTree();
while(!(_tree = _tree.getParent()).isNil()){
if(_tree.getType() == HiveParser.TOK_SUBQUERY){
qt.setpId(generateTreeId(_tree));
qt.setParent(BaseSemanticAnalyzer.getUnescapedName((ASTNode)_tree.getChild(1)));
return qt;
}
}
qt.setpId(-1);
qt.setParent("NIL");
return qt;
}
private int generateTreeId(Tree tree) {
return tree.getTokenStartIndex() + tree.getTokenStopIndex();
}
/**
* 查詢當前節點的子子查詢節點(索引)
* @param ast
*/
private List<QueryTree> getSubQueryChilds(int id) {
List<QueryTree> list = new ArrayList<QueryTree>();
for (int i = 0; i < queryTreeList.size(); i++) {
QueryTree qt = queryTreeList.get(i);
if (id == qt.getpId()) {
list.add(qt);
}
}
return list;
}
/**
* 獲得要解析的名稱
* @param ast
* @param bk
* @return
*/
private String getToNameParse(ASTNode ast, Block bk) {
String alia = "";
Tree child = ast.getChild(0);
if (ast.getChild(1) != null) { //有別名 ip as alia
alia = ast.getChild(1).getText();
} else if (child.getType() == HiveParser.DOT //沒有別名 a.ip
&& child.getChild(0).getType() == HiveParser.TOK_TABLE_OR_COL
&& child.getChild(0).getChildCount() == 1
&& child.getChild(1).getType() == HiveParser.Identifier) {
alia = BaseSemanticAnalyzer.unescapeIdentifier(child.getChild(1).getText());
} else if (child.getType() == HiveParser.TOK_TABLE_OR_COL //沒有別名 ip
&& child.getChildCount() == 1
&& child.getChild(0).getType() == HiveParser.Identifier) {
alia = BaseSemanticAnalyzer.unescapeIdentifier(child.getChild(0).getText());
}
return alia;
}
/**
* 獲得解析的塊,主要應用在WHERE、JOIN和SELECT端
* 如: <p>where a=1
* <p>t1 join t2 on t1.col1=t2.col1 and t1.col2=123
* <p>select count(distinct col1) from t1
* @param ast
* @return
*/
private Block getBlockIteral(ASTNode ast) {
if (ast.getType() == HiveParser.KW_OR
||ast.getType() == HiveParser.KW_AND) {
Block bk1 = getBlockIteral((ASTNode)ast.getChild(0));
Block bk2 = getBlockIteral((ASTNode)ast.getChild(1));
bk1.getColSet().addAll(bk2.getColSet());
bk1.setCondition("(" + bk1.getCondition() + " " + ast.getText() + " " + bk2.getCondition() + ")");
return bk1;
} else if (ast.getType() == HiveParser.NOTEQUAL //判斷條件 > < like in
|| ast.getType() == HiveParser.EQUAL
|| ast.getType() == HiveParser.LESSTHAN
|| ast.getType() == HiveParser.LESSTHANOREQUALTO
|| ast.getType() == HiveParser.GREATERTHAN
|| ast.getType() == HiveParser.GREATERTHANOREQUALTO
|| ast.getType() == HiveParser.KW_LIKE
|| ast.getType() == HiveParser.DIVIDE
|| ast.getType() == HiveParser.PLUS
|| ast.getType() == HiveParser.MINUS
|| ast.getType() == HiveParser.STAR
|| ast.getType() == HiveParser.MOD
|| ast.getType() == HiveParser.AMPERSAND
|| ast.getType() == HiveParser.TILDE
|| ast.getType() == HiveParser.BITWISEOR
|| ast.getType() == HiveParser.BITWISEXOR) {
Block bk1 = getBlockIteral((ASTNode)ast.getChild(0));
if (ast.getChild(1) == null) { // -1
bk1.setCondition(ast.getText() + bk1.getCondition());
} else {
Block bk2 = getBlockIteral((ASTNode)ast.getChild(1));
bk1.getColSet().addAll(bk2.getColSet());
bk1.setCondition(bk1.getCondition() + " " + ast.getText() + " " + bk2.getCondition());
}
return bk1;
} else if (ast.getType() == HiveParser.TOK_FUNCTIONDI) {
Block col = getBlockIteral((ASTNode) ast.getChild(1));
String condition = ast.getChild(0).getText();
col.setCondition(condition + "(distinct (" + col.getCondition() +"))");
return col;
} else if (ast.getType() == HiveParser.TOK_FUNCTION){
String fun = ast.getChild(0).getText();
Block col = ast.getChild(1) == null ? new Block() : getBlockIteral((ASTNode) ast.getChild(1));
if ("when".equalsIgnoreCase(fun)) {
col.setCondition(getWhenCondition(ast));
Set<Block> processChilds = processChilds(ast, 1);
col.getColSet().addAll(bkToCols(col, processChilds));
return col;
} else if("IN".equalsIgnoreCase(fun)) {
col.setCondition(col.getCondition() + " in (" + blockCondToString(processChilds(ast, 2)) + ")");
return col;
} else if("TOK_ISNOTNULL".equalsIgnoreCase(fun) //isnull isnotnull
|| "TOK_ISNULL".equalsIgnoreCase(fun)){
col.setCondition(col.getCondition() + " " + fun.toLowerCase().substring(4));
return col;
} else if("BETWEEN".equalsIgnoreCase(fun)){
col.setCondition(getBlockIteral((ASTNode) ast.getChild(2)).getCondition()
+ " between " + getBlockIteral((ASTNode) ast.getChild(3)).getCondition()
+ " and " + getBlockIteral((ASTNode) ast.getChild(4)).getCondition());
return col;
}
Set<Block> processChilds = processChilds(ast, 1);
col.getColSet().addAll(bkToCols(col, processChilds));
col.setCondition(fun +"("+ blockCondToString(processChilds) + ")");
return col;
} else if(ast.getType() == HiveParser.LSQUARE){ //map,array
Block column = getBlockIteral((ASTNode) ast.getChild(0));
Block key = getBlockIteral((ASTNode) ast.getChild(1));
column.setCondition(column.getCondition() +"["+ key.getCondition() + "]");
return column;
} else {
return parseBlock(ast);
}
}
private Set<String> bkToCols(Block col, Set<Block> processChilds) {
Set<String> set = new LinkedHashSet<String>(processChilds.size());
for (Block colLine : processChilds) {
if (Check.notEmpty(colLine.getColSet())) {
set.addAll(colLine.getColSet());
}
}
return set;
}
private String blockCondToString(Set<Block> processChilds) {
StringBuilder sb = new StringBuilder();
for (Block colLine : processChilds) {
sb.append(colLine.getCondition()).append(SPLIT_COMMA);
}
if (sb.length()>0) {
sb.setLength(sb.length()-1);
}
return sb.toString();
}
/**
* 解析when條件
* @param ast
* @return case when c1>100 then col1 when c1>0 col2 else col3 end
*/
private String getWhenCondition(ASTNode ast) {
int cnt = ast.getChildCount();
StringBuilder sb = new StringBuilder();
for (int i = 1; i < cnt; i++) {
String condition = getBlockIteral((ASTNode)ast.getChild(i)).getCondition();
if (i == 1) {
sb.append("(case when " + condition);
} else if (i == cnt-1) { //else
sb.append(" else " + condition + " end)");
} else if (i % 2 == 0){ //then
sb.append(" then " + condition);
} else {
sb.append(" when " + condition);
}
}
return sb.toString();
}
/**
* 儲存subQuery查詢別名和欄位資訊
* @param sqlIndex
* @param tableAlias
*/
private void putResultQueryMap(int sqlIndex, String tableAlias) {
List<ColLine> list = generateColLineList(cols, conditions);
String key = sqlIndex == 0 ? tableAlias : tableAlias + sqlIndex; //沒有重名的情況就不用標記
resultQueryMap.put(key, list);
}
private List<ColLine> generateColLineList(List<ColLine> cols, Set<String> conditions) {
List<ColLine> list = new ArrayList<ColLine>();
for (ColLine entry : cols) {
entry.getConditionSet().addAll(conditions);
list.add(ParseUtil.cloneColLine(entry));
}
return list;
}
/**
* 判斷正常列,
* 正常:a as col, a
* 異常:1 ,'a' //數字、字元等作為列名
*/
private boolean notNormalCol(String column) {
return Check.isEmpty(column) || NumberUtil.isNumeric(column)
|| (column.startsWith("\"") && column.endsWith("\""))
|| (column.startsWith("\'") && column.endsWith("\'"));
}
/**
* 從指定索引位置開始解析子樹
* @param ast
* @param startIndex 開始索引
* @param isSimple 是否簡寫
* @param withCond 是否包含條件
* @return
*/
private Set<Block> processChilds(ASTNode ast,int startIndex) {
int cnt = ast.getChildCount();
Set<Block> set = new LinkedHashSet<Block>();
for (int i = startIndex; i < cnt; i++) {
Block bk = getBlockIteral((ASTNode) ast.getChild(i));
if (Check.notEmpty(bk.getCondition()) || Check.notEmpty(bk.getColSet())){
set.add(bk);
}
}
return set;
}
/**
* 解析獲得列名或者字元數字等和條件
* @param ast
* @param isSimple
* @return
*/
private Block parseBlock(ASTNode ast) {
if (ast.getType() == HiveParser.DOT
&& ast.getChild(0).getType() == HiveParser.TOK_TABLE_OR_COL
&& ast.getChild(0).getChildCount() == 1
&& ast.getChild(1).getType() == HiveParser.Identifier) {
String column = BaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(1).getText());
String alia = BaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(0).getChild(0).getText());
return getBlock(column, alia);
} else if (ast.getType() == HiveParser.TOK_TABLE_OR_COL
&& ast.getChildCount() == 1
&& ast.getChild(0).getType() == HiveParser.Identifier) {
String column = ast.getChild(0).getText();
return getBlock(column, null);
} else if (ast.getType() == HiveParser.Number
|| ast.getType() == HiveParser.StringLiteral
|| ast.getType() == HiveParser.Identifier) {
Block bk = new Block();
bk.setCondition(ast.getText());
bk.getColSet().add(ast.getText());
return bk;
}
return new Block();
}
/**
* 根據列名和別名獲得塊資訊
* @param column
* @param alia
* @param tree 當前子查詢下的別名可以使用
* @return
*/
private Block getBlock(String column, String alia) {
String[] result = getTableAndAlia(alia);
String tableArray = result[0];
String _alia = result[1];
for (String string : _alia.split(SPLIT_AND)) { //迭代迴圈的時候查詢
QueryTree qt = queryMap.get(string.toLowerCase());
if (Check.notEmpty(column)) {
for (ColLine colLine : qt.getColLineList()) {
if (column.equalsIgnoreCase(colLine.getToNameParse())) {
Block bk = new Block();
bk.setCondition(colLine.getColCondition());
bk.setColSet(ParseUtil.cloneSet(colLine.getFromNameSet()));
return bk;
}
}
}
}
String _realTable = tableArray;
int cnt = 0; //匹配欄位和元資料欄位相同數目,如果有多個匹配,即此sql有二義性
for (String tables : tableArray.split(SPLIT_AND)) { //初始化的時候查詢資料庫對應表
String[] split = tables.split("\\.");
if (split.length > 2) {
throw new SQLParseException("parse table:" + tables);
}
List<String> colByTab = MetaCache.getInstance().getColumnByDBAndTable(tables);
for (String col : colByTab) {
if (column.equalsIgnoreCase(col)) {
_realTable = tables;
cnt++;
}
}
}
// if (cnt == 0) { //此類沒有找到的檢查在Validater類中檢查
// }
if (cnt > 1) { //二義性檢查
throw new SQLParseException("SQL is ambiguity, column: " + column + " tables:" + tableArray);
}
Block bk = new Block();
bk.setCondition(_realTable + SPLIT_DOT + column);
bk.getColSet().add(_realTable + SPLIT_DOT + column);
return bk;
}
/**
* 過濾掉無用的列:如col1,123,'2013',col2 ==>> col1,col2
* @param col
* @return
*/
private Set<String> filterData(Set<String> colSet){
Set<String> set = new LinkedHashSet<String>();
for (String string : colSet) {
if (!notNormalCol(string)) {
set.add(string);
}
}
return set;
}
/**
* 解析所有子節點
* @param ast
* @return
*/
private void parseChildNodes(ASTNode ast){
int numCh = ast.getChildCount();
if (numCh > 0) {
for (int num = 0; num < numCh; num++) {
ASTNode child = (ASTNode) ast.getChild(num);
parseIteral(child);
}
}
}
/**
* 準備解析當前節點
* @param ast
*/
private void prepareToParseCurrentNodeAndChilds(ASTNode ast){
if (ast.getToken() != null) {
switch (ast.getToken().getType()) {
case HiveParser.TOK_SWITCHDATABASE:
System.out.println("nowQueryDB changed " + nowQueryDB+ " to " +ast.getChild(0).getText());
nowQueryDB = ast.getChild(0).getText();
break;
case HiveParser.TOK_TRANSFORM:
throw new UnSupportedException("no support transform using clause");
case HiveParser.TOK_RIGHTOUTERJOIN:
case HiveParser.TOK_LEFTOUTERJOIN:
case HiveParser.TOK_JOIN:
case HiveParser.TOK_LEFTSEMIJOIN:
case HiveParser.TOK_MAPJOIN:
case HiveParser.TOK_FULLOUTERJOIN:
case HiveParser.TOK_UNIQUEJOIN:
joinStack.push(joinClause);
joinClause = true;
joinOnStack.push(joinOn);
joinOn = ast;
break;
}
}
}
/**
* 結束解析當前節點
* @param ast
*/
private void endParseCurrentNode(ASTNode ast){
if (ast.getToken() != null) {
Tree parent = ast.getParent();
switch (ast.getToken().getType()) { //join 從句結束,跳出join
case HiveParser.TOK_RIGHTOUTERJOIN:
case HiveParser.TOK_LEFTOUTERJOIN:
case HiveParser.TOK_JOIN:
case HiveParser.TOK_LEFTSEMIJOIN:
case HiveParser.TOK_MAPJOIN:
case HiveParser.TOK_FULLOUTERJOIN:
case HiveParser.TOK_UNIQUEJOIN:
joinClause = joinStack.pop();
joinOn = joinOnStack.pop();
break;
case HiveParser.TOK_QUERY:
processUnionStack(ast, parent); //union的子節點
case HiveParser.TOK_INSERT:
case HiveParser.TOK_SELECT:
break;
case HiveParser.TOK_UNION: //合併union欄位資訊
mergeUnionCols();
processUnionStack(ast, parent); //union的子節點
break;
}
}
}
private void mergeUnionCols() {
validateUnion(cols);
int size = cols.size();
int colNum = size / 2;
List<ColLine> list = new ArrayList<ColLine>(colNum);
for (int i = 0; i < colNum; i++) { //合併欄位
ColLine col = cols.get(i);
for (int j = i + colNum; j < size; j = j + colNum) {
ColLine col2 = cols.get(j);
list.add(col2);
if (notNormalCol(col.getToNameParse()) && !notNormalCol(col2.getToNameParse())) {
col.setToNameParse(col2.getToNameParse());
}
col.getFromNameSet().addAll(col2.getFromNameSet());
col.setColCondition(col.getColCondition() + SPLIT_AND + col2.getColCondition());
Set<String> conditionSet = ParseUtil.cloneSet(col.getConditionSet());
conditionSet.addAll(col2.getConditionSet());
conditionSet.addAll(conditions);
col.getConditionSet().addAll(conditionSet);
}
}
cols.removeAll(list); //移除已經合併的資料
}
private void processUnionStack(ASTNode ast, Tree parent) {
boolean isNeedAdd = parent.getType() == HiveParser.TOK_UNION;
if (isNeedAdd) {
if (parent.getChild(0) == ast && parent.getChild(1) != null) {//有弟節點(是第一節點)
//壓棧
conditionsStack.push(ParseUtil.cloneSet(conditions));
conditions.clear();
colsStack.push(ParseUtil.cloneList(cols));
cols.clear();
} else { //無弟節點(是第二節點)
//出棧
if (!conditionsStack.isEmpty()) {
conditions.addAll(conditionsStack.pop());
}
if (!colsStack.isEmpty()) {
cols.addAll(0, colsStack.pop());
}
}
}
}
private void parseAST(ASTNode ast) {
parseIteral(ast);
}
public void parse(String sqlAll) throws Exception{
if (Check.isEmpty(sqlAll)) {
return;
}
startParseAll(); //清空最終結果集
int i = 0; //當前是第幾個sql
for (String sql : sqlAll.split("(?<!\\\\);")) {
ParseDriver pd = new ParseDriver();
String trim = sql.toLowerCase().trim();
if (trim.startsWith("set") || trim.startsWith("add") || Check.isEmpty(trim)) {
continue;
}
ASTNode ast = pd.parse(sql);
if ("local".equals(PropertyFileUtil.getProperty("environment"))) {
System.out.println(ast.toStringTree());
}
prepareParse();
parseAST(ast);
endParse(++i);
}
}
/**
* 清空上次處理的結果
*/
private void startParseAll() {
colLines.clear();
outputTables.clear();
inputTables.clear();
}
private void prepareParse() {
isCreateTable = false;
dbMap.clear();
queryMap.clear();
queryTreeList.clear();
//結果
tmpColLines.clear();
tmpOutputTables.clear();
tmpInputTables.clear();
conditionsStack.clear(); //where or join 條件快取
colsStack.clear(); //一個子查詢內的列快取
resultQueryMap.clear();
conditions.clear(); //where or join 條件快取
cols.clear(); //一個子查詢內的列快取
tableNameStack.clear();
joinStack.clear();
joinOnStack.clear();
joinClause = false;
joinOn = null;
}
/**
* 所有解析完畢之後的後期處理
*/
private void endParse(int sqlIndex) {
putResultQueryMap(sqlIndex, TOK_EOF);
putDBMap();
setColLineList();
setOutInputTableSet();
}
/***
* 設定輸出表的欄位對應關係
*/
private void setColLineList() {
Map<String, List<ColLine>> map = new HashMap<String, List<ColLine>>();
for (Entry<String, List<ColLine>> entry : resultQueryMap.entrySet()) {
if (entry.getKey().startsWith(TOK_EOF)) {
List<ColLine> value = entry.getValue();
for (ColLine colLine : value) {
List<ColLine> list = map.get(colLine.getToTable());
if (Check.isEmpty(list)) {
list = new ArrayList<ColLine>();
map.put(colLine.getToTable(), list);
}
list.add(colLine);
}
}
}
for (Entry<String, List<ColLine>> entry : map.entrySet()) {
String table = entry.getKey();
List<ColLine> pList = entry.getValue();
List<String> dList = dbMap.get(table);
int metaSize = Check.isEmpty(dList) ? 0 : dList.size();
for (int i = 0; i < pList.size(); i++) { //按順序插入對應的欄位
ColLine clp = pList.get(i);
String colName = null;
if (i < metaSize) {
colName = table + SPLIT_DOT + dList.get(i);
}
if (isCreateTable && TOK_TMP_FILE.equals(table)) {
for (String string : tmpOutputTables) {
table = string;
}
}
ColLine colLine = new ColLine(clp.getToNameParse(), clp.getColCondition(),
clp.getFromNameSet(), clp.getConditionSet(), table, colName);
colLines.add(colLine);
}
}
}
/***
* 設定輸出表的欄位對應關係
*/
private void setOutInputTableSet() {
outputTables.addAll(ParseUtil.cloneSet(tmpOutputTables));
inputTables.addAll(ParseUtil.cloneSet(tmpInputTables));
}
private void putDBMap() {
for (String table : tmpOutputTables) {
List<String> list = MetaCache.getInstance().getColumnByDBAndTable(table);
dbMap.put(table, list);
}
}
/**
* 補全db資訊
* table1 ==>> db1.table1
* db1.table1 ==>> db1.table1
* db2.t1&t2 ==>> db2.t1&db1.t2
* @param tables