1. 程式人生 > 實用技巧 >Spark SQL Parser到Unresolved LogicPlan

Spark SQL Parser到Unresolved LogicPlan

Spark SQL Parser到Unresolved LogicPlan

Spark SQL Parser簡單來說就是將sql語句解析成為運算元樹的過程,在這個過程中,spark sql採用了antrl4來完成。

當執行spark.sql()方法時,會呼叫

Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
實際會呼叫:
/** Creates LogicalPlan for a given SQL string. */
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
val tmp = astBuilder.visitSingleStatement(parser.singleStatement())
tmp match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}
解析詳細的操作如下:
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")
val oo = CharStreams.fromString(command)
logInfo(s"Parsing command: $oo")

val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)

val tokenStream = new CommonTokenStream(lexer)
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)

try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// if we fail, parse with LL mode
tokenStream.seek(0) // rewind input stream
parser.reset()

// Try Again.
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
case e: ParseException if e.command.isDefined =>
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
}
}
}

在這裡面會利用antrl4來解析整個sql語句,首先會嘗試使用比較快速的SLL方式來解析,如果失敗會轉而使用LL方式來解析。解析完成的sqlBaseParser會呼叫singleStatement()之後會構建整棵樹

之後呼叫AstBuilder的visitSingleStatement來遞迴檢視每個節點,來返回生成的LogicalPlan, 這一步,主要利用antrl4生成的程式碼,使用訪問者模式來挨個檢視各個節點的處理,返回對應的結果,其主要的實現是繼承了SqlBaseBaseVisitor的AstBuild類中。

假設有一段sql如下:

SELECT * FROM NAME WHERE AGE > 10 

那麼它經過antrl4解析之後的樹結構如下:

在AstBuilder.visitSingleStatement方法中:

override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
  val statement: StatementContext = ctx.statement
  printRuleContextInTreeStyle(statement, 1)
  visit(ctx.statement).asInstanceOf[LogicalPlan]
}

  首先輸入是SingleStatementContext,實際也就是上面樹形圖裡面的根節點,之後獲取了根節點下面的StatementContext,由上圖可知實際獲取的就是StatementDefaultContext,再來看看他的accept方法

@Override
		public <T> T accept(ParseTreeVisitor<? extends T> visitor) {
			if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitStatementDefault(this);
			else return visitor.visitChildren(this);
		}

  大多數的節點都是類似這樣的實現,對某些特殊的節點例如FromClauseContext,在AstBuilder中有特殊的處理邏輯實現。所以AstBuilder解析整棵樹都是通過遍歷整棵樹,形成logicalPlan。

在上面的整棵樹裡面在解析到QuerySpecification節點時,在這裡面會觸發形成logicalPlan的操作:

 /**
   * Create a logical plan using a query specification.
   */
  override def visitQuerySpecification(
      ctx: QuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
    val from = OneRowRelation().optional(ctx.fromClause) {
      visitFromClause(ctx.fromClause)
    }
    withQuerySpecification(ctx, from)
  }

  在visitFromClause中會去檢視FromClause節點下面的Relation,同時如果有join操作的話,也會在其中解析join操作,之後呼叫withQuerySpecification解析where,聚合、表示式等子節點形成一顆對整個sql解析之後的樹結構。

/**
   * Add a query specification to a logical plan. The query specification is the core of the logical
   * plan, this is where sourcing (FROM clause), transforming (SELECT TRANSFORM/MAP/REDUCE),
   * projection (SELECT), aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
   *
   * Note that query hints are ignored (both by the parser and the builder).
   */
  private def withQuerySpecification(
      ctx: QuerySpecificationContext,
      relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
    import ctx._

    // WHERE
    def filter(ctx: BooleanExpressionContext, plan: LogicalPlan): LogicalPlan = {
      Filter(expression(ctx), plan)
    }

    // Expressions.
    val expressions = Option(namedExpressionSeq).toSeq
      .flatMap(_.namedExpression.asScala)
      .map(typedVisit[Expression])

    // Create either a transform or a regular query.
    val specType = Option(kind).map(_.getType).getOrElse(SqlBaseParser.SELECT)
    specType match {
      case SqlBaseParser.MAP | SqlBaseParser.REDUCE | SqlBaseParser.TRANSFORM =>
        // Transform

        // Add where.
        val withFilter = relation.optionalMap(where)(filter)

        // Create the attributes.
        val (attributes, schemaLess) = if (colTypeList != null) {
          // Typed return columns.
          (createSchema(colTypeList).toAttributes, false)
        } else if (identifierSeq != null) {
          // Untyped return columns.
          val attrs = visitIdentifierSeq(identifierSeq).map { name =>
            AttributeReference(name, StringType, nullable = true)()
          }
          (attrs, false)
        } else {
          (Seq(AttributeReference("key", StringType)(),
            AttributeReference("value", StringType)()), true)
        }

        // Create the transform.
        ScriptTransformation(
          expressions,
          string(script),
          attributes,
          withFilter,
          withScriptIOSchema(
            ctx, inRowFormat, recordWriter, outRowFormat, recordReader, schemaLess))

      case SqlBaseParser.SELECT =>
        // Regular select

        // Add lateral views.
        val withLateralView = ctx.lateralView.asScala.foldLeft(relation)(withGenerate)

        // Add where.
        val withFilter = withLateralView.optionalMap(where)(filter)

        // Add aggregation or a project.
        val namedExpressions = expressions.map {
          case e: NamedExpression => e
          case e: Expression => UnresolvedAlias(e)
        }
        val withProject = if (aggregation != null) {
          withAggregation(aggregation, namedExpressions, withFilter)
        } else if (namedExpressions.nonEmpty) {
          Project(namedExpressions, withFilter)
        } else {
          withFilter
        }

        // Having
        val withHaving = withProject.optional(having) {
          // Note that we add a cast to non-predicate expressions. If the expression itself is
          // already boolean, the optimizer will get rid of the unnecessary cast.
          val predicate = expression(having) match {
            case p: Predicate => p
            case e => Cast(e, BooleanType)
          }
          Filter(predicate, withProject)
        }

        // Distinct
        val withDistinct = if (setQuantifier() != null && setQuantifier().DISTINCT() != null) {
          Distinct(withHaving)
        } else {
          withHaving
        }

        // Window
        val withWindow = withDistinct.optionalMap(windows)(withWindows)

        // Hint
        hints.asScala.foldRight(withWindow)(withHints)
    }
  }

  在這裡解析後的運算元樹是未繫結的樹結構算是unresolved LogicPlan。