1. 程式人生 > >[6] Hive3.x SemanticAnalyzer and CalcitePlanner 物化檢視相關原始碼-02

[6] Hive3.x SemanticAnalyzer and CalcitePlanner 物化檢視相關原始碼-02

Hive3.x SemanticAnalyzer and CalcitePlanner 物化檢視相關原始碼

SemanticAnalyzer

void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) {
     ....
     // 1. Generate Resolved Parse tree from syntax tree
    boolean needsTransform = needsTransform();
    
    // 2. Gen OP Tree from resolved Parse Tree
Operator sinkOp = genOPTree(ast, plannerCtx);//進入CalcitePlanner::getOPTree //---待續 }

CalcitePlanner::getOPTree

這裡入參為hive的ASTNode
Operator genOPTree(ASTNode ast, PlannerContext plannerCtx){

      ......
      // 1. Gen Optimized AST
         ASTNode newAST = getOptimizedAST();
      

}

CalcitePlanner::getOptimizedAST

 ASTNode getOptimizedAST() throws SemanticException {
    //用calcite優化查詢,生成calcite的RelNode
    RelNode optimizedOptiqPlan = logicalPlan();
    //將RelNode轉化為hive的ASTNode
    ASTNode optiqOptimizedAST = ASTConverter.convert(optimizedOptiqPlan, resultSchema,
            HiveConf.
getBoolVar(conf, HiveConf.ConfVars.HIVE_COLUMN_ALIGNMENT)); return optiqOptimizedAST; }

CalcitePlanner:: logicalPlan

RelNode logicalPlan() throws SemanticException {
    RelNode optimizedOptiqPlan = null;
    CalcitePlannerAction calcitePlannerAction = null;
  
    /**
     * Map of table name to names of accessed columns
     */
    Map<String, Set<String>>  this.columnAccessInfo = new ColumnAccessInfo();
    /**
     * CalcitePlannerAction is code responsible for Calcite plan generation and optimization.
     */
    calcitePlannerAction = new CalcitePlannerAction(
        prunedPartitions,
        ctx.getOpContext().getColStatsCache(),
        this.columnAccessInfo);
        
    //calcite 優化plan,這裡會調起優化工作CalcitePlanner::CalcitePlannerAction::apply()
    optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks
          .newConfigBuilder().typeSystem(new HiveTypeSystemImpl()).build());
    return optimizedOptiqPlan;
  }

CalcitePlanner::CalcitePlannerAction

CalcitePlannerAction is code responsible for Calcite plan generation and optimization.

CalcitePlanner::CalcitePlannerAction::apply(

    @Override
    public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlus rootSchema) {
      RelNode calciteGenPlan = null;
      RelNode calcitePreCboPlan = null;
      RelNode calciteOptimizedPlan = null;
      subqueryId = -1;

      /*
       * recreate cluster, so that it picks up the additional traitDef
       * this is to keep track if a subquery is correlated and contains aggregatesince this is special cased when it  
       * is rewritten in SubqueryRemoveRule
       * Set<RelNode> corrScalarRexSQWithAgg = new HashSet<RelNode>();
       * Set<RelNode> scalarAggNoGbyNoWin = new HashSet<RelNode>();
       * conf 為hive的配置檔案
       */
      // HiveVolcanoPlanner
      RelOptPlanner planner = createPlanner(conf, corrScalarRexSQWithAgg, scalarAggNoGbyNoWin);
      
      final RexBuilder rexBuilder = cluster.getRexBuilder();
      final RelOptCluster optCluster = RelOptCluster.create(planner, rexBuilder);
      this.cluster = optCluster;
      this.relOptSchema = relOptSchema;

      // 1. Gen Calcite Plan
      perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER);
      
      ...
      // getQB()--QB: Implementation of the query block. 語法分析的結果
      // 根據QB構建(RelNode)calciteGenPlan 
      calciteGenPlan = genLogicalPlan(getQB(), true, null, null);
      ...

      // Validate query materialization (materialized views, query results caching.
      // This check needs to occur before constant folding, which may remove some
      // function calls from the query plan.
      HiveRelOpMaterializationValidator matValidator = new HiveRelOpMaterializationValidator();
      matValidator.validateQueryMaterialization(calciteGenPlan);
      if (!matValidator.isValidMaterialization()) {
        String reason = matValidator.getInvalidMaterializationReason();
        setInvalidQueryMaterializationReason(reason);
      }

      // Create executor
      RexExecutor executorProvider = new HiveRexExecutorImpl(optCluster);
      calciteGenPlan.getCluster().getPlanner().setExecutor(executorProvider);

      // We need to get the ColumnAccessInfo and viewToTableSchema for views.
      HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null,
          HiveRelFactories.HIVE_BUILDER.create(optCluster, null), this.columnAccessInfo,
          this.viewProjectToTableSchema);

      fieldTrimmer.trim(calciteGenPlan);

      // Create and set MD provider
      HiveDefaultRelMetadataProvider mdProvider = new HiveDefaultRelMetadataProvider(conf);
      RelMetadataQuery.THREAD_PROVIDERS.set(
              JaninoRelMetadataProvider.of(mdProvider.getMetadataProvider()));

      //Remove subquery
      calciteGenPlan = hepPlan(calciteGenPlan, false, mdProvider.getMetadataProvider(), null,
              new HiveSubQueryRemoveRule(conf));
    
      calciteGenPlan = HiveRelDecorrelator.decorrelateQuery(calciteGenPlan);
      LOG.debug("Plan after decorrelation:\n" + RelOptUtil.toString(calciteGenPlan));

      // 2. Apply pre-join order optimizations
      calcitePreCboPlan = applyPreJoinOrderingTransforms(calciteGenPlan,
              mdProvider.getMetadataProvider(), executorProvider);

      // 3. Materialized view based rewriting 
      //---待續 。。。。
      // We disable it for CTAS and MV creation queries (trying to avoid any problem
      // due to data freshness)a
      if (conf.getBoolVar(ConfVars.HIVE_MATERIALIZED_VIEW_ENABLE_AUTO_REWRITING) &&
              !getQB().isMaterializedView() && !ctx.isLoadingMaterializedView() && !getQB().isCTAS()) {
        calcitePreCboPlan = applyMaterializedViewRewriting(planner,
            calcitePreCboPlan, mdProvider.getMetadataProvider(), executorProvider);
      }

      // 4. Apply join order optimizations: reordering MST algorithm
      
      // 5. Run other optimizations that do not need stats

      // 6. Run aggregate-join transpose (cost based)
      //    If it failed because of missing stats, we continue with
      //    the rest of optimizations
      
      // 7.convert Join + GBy to semijoin
      // run this rule at later stages, since many calcite rules cant deal with semijoin
      
      // 8. convert SemiJoin + GBy to SemiJoin
     
      // 9. Get rid of sq_count_check if group by key is constant (HIVE-)
      
      // 10. Run rule to fix windowing issue when it is done over
      // aggregation columns (HIVE-10627)
      
      // 11. Apply Druid transformation rules
      
      // 12. Run rules to aid in translation from Calcite tree to Hive tree
        // 12.2.  Introduce exchange operators below join/multijoin operators
        
      return calciteOptimizedPlan;
    }

createPlanner

private static RelOptPlanner createPlanner(
      HiveConf conf, Set<RelNode> corrScalarRexSQWithAgg, Set<RelNode> scalarAggNoGbyNoWin) {
    //  Split Memory配置引數,不關注
    final Double maxSplitSize = (double) HiveConf.getLongVar(
            conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
    final Double maxMemory = (double) HiveConf.getLongVar(
            conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
    HiveAlgorithmsConf algorithmsConf = new HiveAlgorithmsConf(maxSplitSize, maxMemory);
    
    //重寫規則注入器
    /***
    public class HiveRulesRegistry {
       private SetMultimap<RelOptRule, RelNode> registryVisited;
       private ListMultimap<RelNode,Set<String>> registryPushedPredicates;
       }
    */
    HiveRulesRegistry registry = new HiveRulesRegistry();
    
    // 配置引數 "timeZone" -> "Asia/Shanghai" ; "materializationsEnabled" -> "false"
    Properties calciteConfigProperties = new Properties();
    calciteConfigProperties.setProperty(
        CalciteConnectionProperty.TIME_ZONE.camelName(),
        conf.getLocalTimeZone().getId());
    calciteConfigProperties.setProperty(
        CalciteConnectionProperty.MATERIALIZATIONS_ENABLED.camelName(),
        Boolean.FALSE.toString());
        
    // CalciteConnectionConfig   
    CalciteConnectionConfig calciteConfig = new CalciteConnectionConfigImpl(calciteConfigProperties);
    
    // 配置引數  isCorrelatedColumns = true ; heuristicMaterializationStrategy= true
    boolean isCorrelatedColumns = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_CORRELATED_MULTI_KEY_JOINS);
    boolean heuristicMaterializationStrategy = HiveConf.getVar(conf,
        HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_SELECTION_STRATEGY).equals("heuristic");
    // 上下文    
    HivePlannerContext confContext = new HivePlannerContext(algorithmsConf, registry, calciteConfig,
        corrScalarRexSQWithAgg, scalarAggNoGbyNoWin,
        new HiveConfPlannerContext(isCorrelatedColumns, heuristicMaterializationStrategy));
        
    return HiveVolcanoPlanner.createPlanner(confContext);
  }

HiveVolcanoPlanner

public class HiveVolcanoPlanner extends VolcanoPlanner {
  private static final boolean ENABLE_COLLATION_TRAIT = true;

  private final boolean isHeuristic;

  /** Creates a HiveVolcanoPlanner. */
  public HiveVolcanoPlanner(HivePlannerContext conf) {
    // 設定cost,具體參見HiveCost
    super(HiveCost.FACTORY, conf);
    isHeuristic = conf.unwrap(HiveConfPlannerContext.class).isHeuristicMaterializationStrategy();
  }

  public static RelOptPlanner createPlanner(HivePlannerContext conf) {
    final VolcanoPlanner planner = new HiveVolcanoPlanner(conf);
    planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
    if (ENABLE_COLLATION_TRAIT) {
      planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
    }
    return planner;
  }

  @Override
  public void registerClass(RelNode node) {
    if (node instanceof DruidQuery) {
      // Special handling for Druid rules here as otherwise
      // planner will add Druid rules with logical builder
      addRule(HiveDruidRules.FILTER);
      ....
      return;
    }
    super.registerClass(node);
  }

  /**
   * The method extends the logic of the super method to decrease
   * the cost of the plan if it contains materialized views
   * (heuristic).
   */
  public RelOptCost getCost(RelNode rel, RelMetadataQuery mq) {
    .......
    return cost;
  }
}


genLogicalPlan

根據語法分析的結果QB,構建calcite的RelNode

private RelNode genLogicalPlan(QB qb, boolean outerMostQB,
                                   ImmutableMap<String, Integer> outerNameToPosMap,
                                   RowResolver outerRR) throws SemanticException {
      RelNode srcRel = null;
      RelNode filterRel = null;
      RelNode gbRel = null;
      RelNode gbHavingRel = null;
      RelNode selectRel = null;
      RelNode obRel = null;
      RelNode limitRel = null;
      
    // 1. Build Rel For Src (SubQuery, TS, Join)
    // 1.1. Recurse over the subqueries to fill the subquery part of the plan 
    // 1.2 Recurse over all the source tables
    // 1.3 process join
    // 1.3.1 process hints
    // 1.3.2 process the actual join
    // 2. Build Rel for where Clause
    // 3. Build Rel for GB Clause
    // 4. Build Rel for GB Having Clause
    // 5. Build Rel for Select Clause
    // 6. Build Rel for OB Clause
    // 7. Build Rel for Limit Clause
    // 8. Incase this QB corresponds to subquery then modify its RR to point
    return srcRel;
 }
      

接上genLogicalPlan

1.2 Recurse over all the source tables

RelNode op = genTableLogicalPlan(tableAlias, qb);

genTableLogicalPlan

獲取元資料,構建TableLogicalPlan

private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticException {
   // 1. If the table has a Sample specified, bail from Calcite path.
   // 2. if returnpath is on and hivetestmode is on bail
   // 2. Get Table Metadata
   // 備註:在這裡獲取元資料
    Table tabMetaData = qb.getMetaData().getSrcForAlias(tableAlias);

    // 3. Get Table Logical Schema (Row Type)
    // NOTE: Table logical schema = Non Partition Cols + Partition Cols +
    // Virtual Cols

    // 3.1 Add Column info for non partion cols (Object Inspector fields)
    // 3.2 Add column info corresponding to partition columns
    // 3.3 Add column info corresponding to virtual columns
    // 4. Build operator
    // 5. Build Hive Table Scan Rel
      tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION),optTable,
              null == tableAlias ? tabMetaData.getTableName() : tableAlias,
              getAliasId(tableAlias, qb), HiveConf.getBoolVar(conf,
                  HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP), qb.isInsideView()
                  || qb.getAliasInsideView().contains(tableAlias.toLowerCase()));
    // 6. Add Schema(RR) to RelNode-Schema map
    return tableRel;

}