Spark SQL Parser到Unresolved LogicPlan
Spark SQL Parser到Unresolved LogicPlan
Spark SQL Parser簡單來說就是將sql語句解析成為運算元樹的過程,在這個過程中,spark sql採用了antrl4來完成。
當執行spark.sql()方法時,會呼叫
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
實際會呼叫:
/** Creates LogicalPlan for a given SQL string. */
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>val tmp = astBuilder.visitSingleStatement(parser.singleStatement())
tmp match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}
解析詳細的操作如下:
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {logDebug(s"Parsing command: $command")
val oo = CharStreams.fromString(command)
logInfo(s"Parsing command: $oo")
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
val tokenStream = new CommonTokenStream(lexer)val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// if we fail, parse with LL mode
tokenStream.seek(0) // rewind input stream
parser.reset()
// Try Again.
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
case e: ParseException if e.command.isDefined =>
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
}
}
}
在這裡面會利用antrl4來解析整個sql語句,首先會嘗試使用比較快速的SLL方式來解析,如果失敗會轉而使用LL方式來解析。解析完成的sqlBaseParser會呼叫singleStatement()之後會構建整棵樹
之後呼叫AstBuilder的visitSingleStatement來遞迴檢視每個節點,來返回生成的LogicalPlan, 這一步,主要利用antrl4生成的程式碼,使用訪問者模式來挨個檢視各個節點的處理,返回對應的結果,其主要的實現是繼承了SqlBaseBaseVisitor的AstBuild類中。
假設有一段sql如下:
SELECT * FROM NAME WHERE AGE > 10
那麼它經過antrl4解析之後的樹結構如下:
在AstBuilder.visitSingleStatement方法中:
override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) { val statement: StatementContext = ctx.statement printRuleContextInTreeStyle(statement, 1) visit(ctx.statement).asInstanceOf[LogicalPlan] }
首先輸入是SingleStatementContext,實際也就是上面樹形圖裡面的根節點,之後獲取了根節點下面的StatementContext,由上圖可知實際獲取的就是StatementDefaultContext,再來看看他的accept方法
@Override public <T> T accept(ParseTreeVisitor<? extends T> visitor) { if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitStatementDefault(this); else return visitor.visitChildren(this); }
大多數的節點都是類似這樣的實現,對某些特殊的節點例如FromClauseContext,在AstBuilder中有特殊的處理邏輯實現。所以AstBuilder解析整棵樹都是通過遍歷整棵樹,形成logicalPlan。
在上面的整棵樹裡面在解析到QuerySpecification節點時,在這裡面會觸發形成logicalPlan的操作:
/** * Create a logical plan using a query specification. */ override def visitQuerySpecification( ctx: QuerySpecificationContext): LogicalPlan = withOrigin(ctx) { val from = OneRowRelation().optional(ctx.fromClause) { visitFromClause(ctx.fromClause) } withQuerySpecification(ctx, from) }
在visitFromClause中會去檢視FromClause節點下面的Relation,同時如果有join操作的話,也會在其中解析join操作,之後呼叫withQuerySpecification解析where,聚合、表示式等子節點形成一顆對整個sql解析之後的樹結構。
/** * Add a query specification to a logical plan. The query specification is the core of the logical * plan, this is where sourcing (FROM clause), transforming (SELECT TRANSFORM/MAP/REDUCE), * projection (SELECT), aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place. * * Note that query hints are ignored (both by the parser and the builder). */ private def withQuerySpecification( ctx: QuerySpecificationContext, relation: LogicalPlan): LogicalPlan = withOrigin(ctx) { import ctx._ // WHERE def filter(ctx: BooleanExpressionContext, plan: LogicalPlan): LogicalPlan = { Filter(expression(ctx), plan) } // Expressions. val expressions = Option(namedExpressionSeq).toSeq .flatMap(_.namedExpression.asScala) .map(typedVisit[Expression]) // Create either a transform or a regular query. val specType = Option(kind).map(_.getType).getOrElse(SqlBaseParser.SELECT) specType match { case SqlBaseParser.MAP | SqlBaseParser.REDUCE | SqlBaseParser.TRANSFORM => // Transform // Add where. val withFilter = relation.optionalMap(where)(filter) // Create the attributes. val (attributes, schemaLess) = if (colTypeList != null) { // Typed return columns. (createSchema(colTypeList).toAttributes, false) } else if (identifierSeq != null) { // Untyped return columns. val attrs = visitIdentifierSeq(identifierSeq).map { name => AttributeReference(name, StringType, nullable = true)() } (attrs, false) } else { (Seq(AttributeReference("key", StringType)(), AttributeReference("value", StringType)()), true) } // Create the transform. ScriptTransformation( expressions, string(script), attributes, withFilter, withScriptIOSchema( ctx, inRowFormat, recordWriter, outRowFormat, recordReader, schemaLess)) case SqlBaseParser.SELECT => // Regular select // Add lateral views. val withLateralView = ctx.lateralView.asScala.foldLeft(relation)(withGenerate) // Add where. val withFilter = withLateralView.optionalMap(where)(filter) // Add aggregation or a project. val namedExpressions = expressions.map { case e: NamedExpression => e case e: Expression => UnresolvedAlias(e) } val withProject = if (aggregation != null) { withAggregation(aggregation, namedExpressions, withFilter) } else if (namedExpressions.nonEmpty) { Project(namedExpressions, withFilter) } else { withFilter } // Having val withHaving = withProject.optional(having) { // Note that we add a cast to non-predicate expressions. If the expression itself is // already boolean, the optimizer will get rid of the unnecessary cast. val predicate = expression(having) match { case p: Predicate => p case e => Cast(e, BooleanType) } Filter(predicate, withProject) } // Distinct val withDistinct = if (setQuantifier() != null && setQuantifier().DISTINCT() != null) { Distinct(withHaving) } else { withHaving } // Window val withWindow = withDistinct.optionalMap(windows)(withWindows) // Hint hints.asScala.foldRight(withWindow)(withHints) } }
在這裡解析後的運算元樹是未繫結的樹結構算是unresolved LogicPlan。