[6] Hive3.x SemanticAnalyzer and CalcitePlanner 物化檢視相關原始碼-02
阿新 • • 發佈:2018-12-09
接Hive3.x SemanticAnalyzer and CalcitePlanner 物化檢視相關原始碼
SemanticAnalyzer
void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) {
....
// 1. Generate Resolved Parse tree from syntax tree
boolean needsTransform = needsTransform();
// 2. Gen OP Tree from resolved Parse Tree
Operator sinkOp = genOPTree(ast, plannerCtx);//進入CalcitePlanner::getOPTree
//---待續
}
CalcitePlanner::getOPTree
這裡入參為hive的ASTNode
Operator genOPTree(ASTNode ast, PlannerContext plannerCtx){
......
// 1. Gen Optimized AST
ASTNode newAST = getOptimizedAST();
}
CalcitePlanner::getOptimizedAST
ASTNode getOptimizedAST() throws SemanticException {
//用calcite優化查詢,生成calcite的RelNode
RelNode optimizedOptiqPlan = logicalPlan();
//將RelNode轉化為hive的ASTNode
ASTNode optiqOptimizedAST = ASTConverter.convert(optimizedOptiqPlan, resultSchema,
HiveConf. getBoolVar(conf, HiveConf.ConfVars.HIVE_COLUMN_ALIGNMENT));
return optiqOptimizedAST;
}
CalcitePlanner:: logicalPlan
RelNode logicalPlan() throws SemanticException {
RelNode optimizedOptiqPlan = null;
CalcitePlannerAction calcitePlannerAction = null;
/**
* Map of table name to names of accessed columns
*/
Map<String, Set<String>> this.columnAccessInfo = new ColumnAccessInfo();
/**
* CalcitePlannerAction is code responsible for Calcite plan generation and optimization.
*/
calcitePlannerAction = new CalcitePlannerAction(
prunedPartitions,
ctx.getOpContext().getColStatsCache(),
this.columnAccessInfo);
//calcite 優化plan,這裡會調起優化工作CalcitePlanner::CalcitePlannerAction::apply()
optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks
.newConfigBuilder().typeSystem(new HiveTypeSystemImpl()).build());
return optimizedOptiqPlan;
}
CalcitePlanner::CalcitePlannerAction
CalcitePlannerAction is code responsible for Calcite plan generation and optimization.
CalcitePlanner::CalcitePlannerAction::apply(
@Override
public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlus rootSchema) {
RelNode calciteGenPlan = null;
RelNode calcitePreCboPlan = null;
RelNode calciteOptimizedPlan = null;
subqueryId = -1;
/*
* recreate cluster, so that it picks up the additional traitDef
* this is to keep track if a subquery is correlated and contains aggregatesince this is special cased when it
* is rewritten in SubqueryRemoveRule
* Set<RelNode> corrScalarRexSQWithAgg = new HashSet<RelNode>();
* Set<RelNode> scalarAggNoGbyNoWin = new HashSet<RelNode>();
* conf 為hive的配置檔案
*/
// HiveVolcanoPlanner
RelOptPlanner planner = createPlanner(conf, corrScalarRexSQWithAgg, scalarAggNoGbyNoWin);
final RexBuilder rexBuilder = cluster.getRexBuilder();
final RelOptCluster optCluster = RelOptCluster.create(planner, rexBuilder);
this.cluster = optCluster;
this.relOptSchema = relOptSchema;
// 1. Gen Calcite Plan
perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER);
...
// getQB()--QB: Implementation of the query block. 語法分析的結果
// 根據QB構建(RelNode)calciteGenPlan
calciteGenPlan = genLogicalPlan(getQB(), true, null, null);
...
// Validate query materialization (materialized views, query results caching.
// This check needs to occur before constant folding, which may remove some
// function calls from the query plan.
HiveRelOpMaterializationValidator matValidator = new HiveRelOpMaterializationValidator();
matValidator.validateQueryMaterialization(calciteGenPlan);
if (!matValidator.isValidMaterialization()) {
String reason = matValidator.getInvalidMaterializationReason();
setInvalidQueryMaterializationReason(reason);
}
// Create executor
RexExecutor executorProvider = new HiveRexExecutorImpl(optCluster);
calciteGenPlan.getCluster().getPlanner().setExecutor(executorProvider);
// We need to get the ColumnAccessInfo and viewToTableSchema for views.
HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null,
HiveRelFactories.HIVE_BUILDER.create(optCluster, null), this.columnAccessInfo,
this.viewProjectToTableSchema);
fieldTrimmer.trim(calciteGenPlan);
// Create and set MD provider
HiveDefaultRelMetadataProvider mdProvider = new HiveDefaultRelMetadataProvider(conf);
RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(mdProvider.getMetadataProvider()));
//Remove subquery
calciteGenPlan = hepPlan(calciteGenPlan, false, mdProvider.getMetadataProvider(), null,
new HiveSubQueryRemoveRule(conf));
calciteGenPlan = HiveRelDecorrelator.decorrelateQuery(calciteGenPlan);
LOG.debug("Plan after decorrelation:\n" + RelOptUtil.toString(calciteGenPlan));
// 2. Apply pre-join order optimizations
calcitePreCboPlan = applyPreJoinOrderingTransforms(calciteGenPlan,
mdProvider.getMetadataProvider(), executorProvider);
// 3. Materialized view based rewriting
//---待續 。。。。
// We disable it for CTAS and MV creation queries (trying to avoid any problem
// due to data freshness)a
if (conf.getBoolVar(ConfVars.HIVE_MATERIALIZED_VIEW_ENABLE_AUTO_REWRITING) &&
!getQB().isMaterializedView() && !ctx.isLoadingMaterializedView() && !getQB().isCTAS()) {
calcitePreCboPlan = applyMaterializedViewRewriting(planner,
calcitePreCboPlan, mdProvider.getMetadataProvider(), executorProvider);
}
// 4. Apply join order optimizations: reordering MST algorithm
// 5. Run other optimizations that do not need stats
// 6. Run aggregate-join transpose (cost based)
// If it failed because of missing stats, we continue with
// the rest of optimizations
// 7.convert Join + GBy to semijoin
// run this rule at later stages, since many calcite rules cant deal with semijoin
// 8. convert SemiJoin + GBy to SemiJoin
// 9. Get rid of sq_count_check if group by key is constant (HIVE-)
// 10. Run rule to fix windowing issue when it is done over
// aggregation columns (HIVE-10627)
// 11. Apply Druid transformation rules
// 12. Run rules to aid in translation from Calcite tree to Hive tree
// 12.2. Introduce exchange operators below join/multijoin operators
return calciteOptimizedPlan;
}
createPlanner
private static RelOptPlanner createPlanner(
HiveConf conf, Set<RelNode> corrScalarRexSQWithAgg, Set<RelNode> scalarAggNoGbyNoWin) {
// Split Memory配置引數,不關注
final Double maxSplitSize = (double) HiveConf.getLongVar(
conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
final Double maxMemory = (double) HiveConf.getLongVar(
conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
HiveAlgorithmsConf algorithmsConf = new HiveAlgorithmsConf(maxSplitSize, maxMemory);
//重寫規則注入器
/***
public class HiveRulesRegistry {
private SetMultimap<RelOptRule, RelNode> registryVisited;
private ListMultimap<RelNode,Set<String>> registryPushedPredicates;
}
*/
HiveRulesRegistry registry = new HiveRulesRegistry();
// 配置引數 "timeZone" -> "Asia/Shanghai" ; "materializationsEnabled" -> "false"
Properties calciteConfigProperties = new Properties();
calciteConfigProperties.setProperty(
CalciteConnectionProperty.TIME_ZONE.camelName(),
conf.getLocalTimeZone().getId());
calciteConfigProperties.setProperty(
CalciteConnectionProperty.MATERIALIZATIONS_ENABLED.camelName(),
Boolean.FALSE.toString());
// CalciteConnectionConfig
CalciteConnectionConfig calciteConfig = new CalciteConnectionConfigImpl(calciteConfigProperties);
// 配置引數 isCorrelatedColumns = true ; heuristicMaterializationStrategy= true
boolean isCorrelatedColumns = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_CORRELATED_MULTI_KEY_JOINS);
boolean heuristicMaterializationStrategy = HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_SELECTION_STRATEGY).equals("heuristic");
// 上下文
HivePlannerContext confContext = new HivePlannerContext(algorithmsConf, registry, calciteConfig,
corrScalarRexSQWithAgg, scalarAggNoGbyNoWin,
new HiveConfPlannerContext(isCorrelatedColumns, heuristicMaterializationStrategy));
return HiveVolcanoPlanner.createPlanner(confContext);
}
HiveVolcanoPlanner
public class HiveVolcanoPlanner extends VolcanoPlanner {
private static final boolean ENABLE_COLLATION_TRAIT = true;
private final boolean isHeuristic;
/** Creates a HiveVolcanoPlanner. */
public HiveVolcanoPlanner(HivePlannerContext conf) {
// 設定cost,具體參見HiveCost
super(HiveCost.FACTORY, conf);
isHeuristic = conf.unwrap(HiveConfPlannerContext.class).isHeuristicMaterializationStrategy();
}
public static RelOptPlanner createPlanner(HivePlannerContext conf) {
final VolcanoPlanner planner = new HiveVolcanoPlanner(conf);
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
if (ENABLE_COLLATION_TRAIT) {
planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
}
return planner;
}
@Override
public void registerClass(RelNode node) {
if (node instanceof DruidQuery) {
// Special handling for Druid rules here as otherwise
// planner will add Druid rules with logical builder
addRule(HiveDruidRules.FILTER);
....
return;
}
super.registerClass(node);
}
/**
* The method extends the logic of the super method to decrease
* the cost of the plan if it contains materialized views
* (heuristic).
*/
public RelOptCost getCost(RelNode rel, RelMetadataQuery mq) {
.......
return cost;
}
}
genLogicalPlan
根據語法分析的結果QB,構建calcite的RelNode
private RelNode genLogicalPlan(QB qb, boolean outerMostQB,
ImmutableMap<String, Integer> outerNameToPosMap,
RowResolver outerRR) throws SemanticException {
RelNode srcRel = null;
RelNode filterRel = null;
RelNode gbRel = null;
RelNode gbHavingRel = null;
RelNode selectRel = null;
RelNode obRel = null;
RelNode limitRel = null;
// 1. Build Rel For Src (SubQuery, TS, Join)
// 1.1. Recurse over the subqueries to fill the subquery part of the plan
// 1.2 Recurse over all the source tables
// 1.3 process join
// 1.3.1 process hints
// 1.3.2 process the actual join
// 2. Build Rel for where Clause
// 3. Build Rel for GB Clause
// 4. Build Rel for GB Having Clause
// 5. Build Rel for Select Clause
// 6. Build Rel for OB Clause
// 7. Build Rel for Limit Clause
// 8. Incase this QB corresponds to subquery then modify its RR to point
return srcRel;
}
接上genLogicalPlan
1.2 Recurse over all the source tables
RelNode op = genTableLogicalPlan(tableAlias, qb);
genTableLogicalPlan
獲取元資料,構建TableLogicalPlan
private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticException {
// 1. If the table has a Sample specified, bail from Calcite path.
// 2. if returnpath is on and hivetestmode is on bail
// 2. Get Table Metadata
// 備註:在這裡獲取元資料
Table tabMetaData = qb.getMetaData().getSrcForAlias(tableAlias);
// 3. Get Table Logical Schema (Row Type)
// NOTE: Table logical schema = Non Partition Cols + Partition Cols +
// Virtual Cols
// 3.1 Add Column info for non partion cols (Object Inspector fields)
// 3.2 Add column info corresponding to partition columns
// 3.3 Add column info corresponding to virtual columns
// 4. Build operator
// 5. Build Hive Table Scan Rel
tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION),optTable,
null == tableAlias ? tabMetaData.getTableName() : tableAlias,
getAliasId(tableAlias, qb), HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP), qb.isInsideView()
|| qb.getAliasInsideView().contains(tableAlias.toLowerCase()));
// 6. Add Schema(RR) to RelNode-Schema map
return tableRel;
}