Spark SQL Catalyst原始碼分析之UDF
在SQL的世界裡,除了官方提供的常用的處理函式之外,一般都會提供可擴充套件的對外自定義函式介面,這已經成為一種事實的標準。
在前面Spark SQL原始碼分析之核心流程一文中,已經介紹了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions這個解析函式的功能。但是隨著Spark1.1版本的釋出,Spark SQL的程式碼有很多新完善和新功能了,和我先前基於1.0的原始碼分析多少有些不同,比如支援UDF:
spark1.0及以前的實現:
protected[sql] lazy val catalog: Catalog = new SimpleCatalog @transient protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true) //EmptyFunctionRegistry空實現 @transient protected[sql] val optimizer = Optimizer
Spark1.1及以後的實現:
protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry //SimpleFunctionRegistry實現,支援簡單的UDF
@transient
protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = true)
一、引子:
對於SQL語句中的函式,會經過SqlParser的的解析成UnresolvedFunction。UnresolvedFunction最後會被Analyzer解析。
SqlParser:
除了非官方定義的函式外,還可以定義自定義函式,sql parser會進行解析。
ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {
case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)
將SqlParser傳入的udfName和exprs封裝成一個class class UnresolvedFunction繼承自Expression。 只是這個Expression的dataType等一系列屬性和eval計算方法均無法訪問,強制訪問會丟擲異常,因為它沒有被Resolved,只是一個載體。
case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {
override def dataType = throw new UnresolvedException(this, "dataType")
override def foldable = throw new UnresolvedException(this, "foldable")
override def nullable = throw new UnresolvedException(this, "nullable")
override lazy val resolved = false
// Unresolved functions are transient at compile time and don't get evaluated during execution.
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString = s"'$name(${children.mkString(",")})"
}<strong></strong>
Analyzer:
Analyzer初始化的時候會需要Catalog,database和table的元資料關係,以及FunctionRegistry來維護UDF名稱和UDF實現的元資料,這裡使用SimpleFunctionRegistry。
/**
* Replaces [[UnresolvedFunction]]s with concrete [[catalyst.expressions.Expression Expressions]].
*/
object ResolveFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan =>
q transformExpressions { //對當前LogicalPlan進行transformExpressions操作
case u @ UnresolvedFunction(name, children) if u.childrenResolved => //如果遍歷到了UnresolvedFunction
registry.lookupFunction(name, children) //從UDF元資料表裡查詢udf函式
}
}
}
二、UDF註冊
2.1 UDFRegistration
registerFunction是UDFRegistration下的方法,SQLContext現在實現了UDFRegistration這個trait,只要匯入SQLContext,即可以使用udf功能。
UDFRegistration核心方法registerFunction:
registerFunction方法簽名def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit
接受一個udfName 和 一個FunctionN,可以是Function1 到Function22。即這個udf的引數只支援1-22個。(scala的痛啊)
內部builder通過ScalaUdf來構造一個Expression,這裡ScalaUdf繼承自Expression(可以簡單的理解目前的SimpleUDF即是一個Catalyst的一個Expression),傳入scala的function作為UDF的實現,並且用反射檢查欄位型別是否是Catalyst允許的,見ScalaReflection.
def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = {
def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)//構造Expression
functionRegistry.registerFunction(name, builder)//向SQLContext的functionRegistry(維護了一個hashMap來管理udf對映)註冊
}
2.2 註冊Function:注意:這裡FunctionBuilder是一個type FunctionBuilder = Seq[Expression] => Expression
class SimpleFunctionRegistry extends FunctionRegistry {
val functionBuilders = new mutable.HashMap[String, FunctionBuilder]() //udf對映關係維護[udfName,Expression]
def registerFunction(name: String, builder: FunctionBuilder) = { //put expression進Map
functionBuilders.put(name, builder)
}
override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
functionBuilders(name)(children) //查詢udf,返回Expression
}
}
至此,我們將一個scala function註冊為一個catalyst的一個Expression,這就是spark的simple udf。三、UDF計算:
UDF既然已經被封裝為catalyst樹裡的一個Expression節點,那麼計算的時候也就是計算ScalaUdf的eval方法。
先通過Row和表示式計算function所需要的引數,最後通過反射呼叫function,來達到計算udf的目的。
ScalaUdf繼承自Expression:
scalaUdf接受一個function, dataType,和一系列表示式。
比較簡單,看註釋即可:
case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])
extends Expression {
type EvaluatedType = Any
def nullable = true
override def toString = s"scalaUDF(${children.mkString(",")})"
override def eval(input: Row): Any = {
val result = children.size match {
case 0 => function.asInstanceOf[() => Any]()
case 1 => function.asInstanceOf[(Any) => Any](children(0).eval(input)) //反射呼叫function
case 2 =>
function.asInstanceOf[(Any, Any) => Any](
children(0).eval(input), //表示式引數計算
children(1).eval(input))
case 3 =>
function.asInstanceOf[(Any, Any, Any) => Any](
children(0).eval(input),
children(1).eval(input),
children(2).eval(input))
case 4 =>
......
case 22 => //scala function只支援22個引數,這裡枚舉了。
function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
children(0).eval(input),
children(1).eval(input),
children(2).eval(input),
children(3).eval(input),
children(4).eval(input),
children(5).eval(input),
children(6).eval(input),
children(7).eval(input),
children(8).eval(input),
children(9).eval(input),
children(10).eval(input),
children(11).eval(input),
children(12).eval(input),
children(13).eval(input),
children(14).eval(input),
children(15).eval(input),
children(16).eval(input),
children(17).eval(input),
children(18).eval(input),
children(19).eval(input),
children(20).eval(input),
children(21).eval(input))
四、總結
Spark目前的UDF其實就是scala function。將scala function封裝到一個Catalyst Expression當中,在進行sql計算時,使用同樣的Eval方法對當前輸入Row進行計算。
編寫一個spark udf非常簡單,只需給UDF起個函式名,並且傳遞一個scala function即可。依靠scala函式程式設計的表現能力,使得編寫scala udf比較簡單,且相較hive的udf更容易使人理解。
——EOF——
原創文章,轉載請註明:
注:本文基於署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協議,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章連結。如若需要用於商業目的或者與授權方面的協商,請聯絡我。