1. 程式人生 > >SparkSQL檢視除錯生成程式碼

SparkSQL檢視除錯生成程式碼

網站和一些書籍都有介紹SparkSQL(DataFrame)會根據相應的操作生成最終執行的語句。這裡從一個簡單的、低階的問題入手到最後通過檢視生成的程式碼查詢問題的根源,並簡單介紹怎麼來除錯SparkSQL。

問題來源:

1
2
3
4
5
6
7
8
9
case class Access(id:String,url:String,time:String){
def compute():(String, Int)
}
Object Access {
def apply(row:Row): Option[Access]
}

# main
df.map(Access(_)).filter(!_.isEmpty).map(_.get).map(_.compute)

執行之後 compute 總是報 NullPointerException 異常。按RDD以及Scala的操作都是沒法理解的,怎麼就變成 Access(null,null,null) 了呢?後面儘管改成 df.flatMap(Access(_)).map(_.compute) 後執行正常了,但是還是想看看SparkSQL到底幹了啥!!!

SparkSQL幹了什麼

Spark RDD是在 RDD#compute 中明確定義好了操作的。而SparkSQL的操作最終轉換成了LogicalPlan,看不出它做了什麼東東。

其實,與資料庫SQL的explain看執行計劃類似,SparkSQL也有explain的方法來檢視程式的執行計劃。(這裡程式碼全部貼出來了,根據情況自己去掉註釋啊)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
object AccessAnalyser {

  def main(args: Array[String]): Unit = {

    // conf

    // clean
    new File("target/generated-sources").listFiles().filter(_.isFile()).foreach(_.delete)

    sys.props("org.codehaus.janino.source_debugging.enable") = "true"
    sys.props("org.codehaus.janino.source_debugging.dir") = "target/generated-sources"

    val input = "r:/match10.dat"
    val output = "r:/output"
    def delete(f: File): Unit = {
      if (f.isDirectory) f.listFiles().foreach(delete)
      f.delete()
    }
    delete(new File(output))

    // program

    val conf = new SparkConf().setAppName("DPI Analyser").setMaster("local[10]")
    // fix windows path.
    conf.set(/*SQLConf.WAREHOUSE_PATH*/ "spark.sql.warehouse.dir", "spark-warehouse")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.sql.functions._

    val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "false") // Use first line of all files as header
      .option("quote", "'")
      .option("escape", "'")
      .option("delimiter", ",")
      .load(input)

    df
      .flatMap(Access(_))
      //      .map(Access(_)).filter((t: Option[Access]) => !t.isEmpty).map(_.get) // sparksql不合適用Option
      .map(_.compute)
      .explain(true)
      //      .toDF("id", "score")
      //      .groupBy("id").agg(sum("score") as "score")
      //      .sort("score", "id")
      //      .repartition(1)
      //      .write.format("com.databricks.spark.csv").save(output)

    sc.stop()
  }

}

執行上面的程式碼,在console視窗輸出了任務的執行計劃:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1, true) AS _1#20, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#21]
+- 'MapElements <function1>, obj#19: scala.Tuple2
   +- 'DeserializeToObject unresolveddeserializer(newInstance(class com.github.winse.spark.access.Access)), obj#18: com.github.winse.spark.access.Access
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.github.winse.spark.access.Access, true], top level non-flat input object).id, true) AS id#12, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.github.winse.spark.access.Access, true], top level non-flat input object).url, true) AS url#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.github.winse.spark.access.Access, true], top level non-flat input object).time, true) AS time#14]
         +- MapPartitions <function1>, obj#11: com.github.winse.spark.access.Access
            +- DeserializeToObject createexternalrow(_c0#0.toString, _c1#1.toString, _c2#2.toString, StructField(_c0,StringType,true), StructField(_c1,StringType,true), StructField(_c2,StringType,true)), obj#10: org.apache.spark.sql.Row
               +- Relation[_c0#0,_c1#1,_c2#2] csv

== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1, true) AS _1#20, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#21]
+- *MapElements <function1>, obj#19: scala.Tuple2
   +- MapPartitions <function1>, obj#11: com.github.winse.spark.access.Access
      +- DeserializeToObject createexternalrow(_c0#0.toString, _c1#1.toString, _c2#2.toString, StructField(_c0,StringType,true), StructField(_c1,StringType,true), StructField(_c2,StringType,true)), obj#10: org.apache.spark.sql.Row
         +- *Scan csv [_c0#0,_c1#1,_c2#2] Format: CSV, InputPaths: file:/r:/match10.dat, PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string>

OK,看到執行計劃了,那生成的程式碼長什麼樣呢?以及怎麼除錯這些生成的程式碼呢?

Hack 原始碼

在進行除錯之前,先改一下程式碼重新編譯下catalyst用於除錯,並替換maven下面的spark-catalyst_2.11 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[email protected] ~/git/spark/sql/catalyst
$ git diff .
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/                                                                                          src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 16fb1f6..56bfbf7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -854,7 +854,7 @@ object CodeGenerator extends Logging {
     val parentClassLoader = new ParentClassLoader(Utils.getContextOrSparkClassLoader)
     evaluator.setParentClassLoader(parentClassLoader)
     // Cannot be under package codegen, or fail with java.lang.InstantiationException
-    evaluator.setClassName("org.apache.spark.sql.catalyst.expressions.GeneratedClass")
     evaluator.setDefaultImports(Array(
       classOf[Platform].getName,
       classOf[InternalRow].getName,
@@ -875,12 +875,14 @@ object CodeGenerator extends Logging {

     logDebug({
       // Only add extra debugging info to byte code when we are going to print the source code.
-      evaluator.setDebuggingInformation(true, true, false)
+      evaluator.setDebuggingInformation(true, true, true)
       s"\n$formatted"
     })

     try {
-      evaluator.cook("generated.java", code.body)
+      evaluator.cook(code.body)
       recordCompilationStats(evaluator)
     } catch {
       case e: Exception =>

E:\git\spark\sql\catalyst>mvn clean package -DskipTests -Dmaven.test.skip=true

SparkSQL生成程式碼用的是janino,官網文件有提供debugging的資料:http://janino-compiler.github.io/janino/#debugging 。簡單說明下三處修改:

  • 檢視org.codehaus.janino.Scanner構造方法,如果配置了debugging以及optionalFileName==null就會把原始碼儲存到臨時檔案。
  • 一開始沒想到要註釋掉setClassName的,後面把CodeGenerator#doCompile拷貝出來慢慢和官網提供的例子對,就把setClassName換成setExtendedClass竟然成了彈出了原始碼頁面。又看到下面就setExtendedClass就註釋掉setClassName就ok了。
  • 原始碼裡面的引數不能檢視的,就是編譯的時刻把這個選項去掉了。把debugVars設定為true。

執行除錯

先做好除錯準備工作:

  • 在compute方法裡面打一個斷點然後除錯執行
  • 修改log4j日誌級別: log4j.logger.org.apache.spark.sql.catalyst.expressions.codegen=DEBUG
  • 把專案匯入eclipse(IDEA彈不出原始碼)

然後執行。點選Debug檢視的GeneratedIterator,在彈出的程式碼檢視點選查詢原始碼按鈕,再彈出的新增原始碼對話方塊(Edit Source Lookup Path)新增路徑target/generated-sources(注意這裡要用絕對路徑)!接下來就一步步的調就行了。

除錯著生成的程式碼能更好的理解前面explain的執行計劃。看到程式碼就好理解最開始的Access(null,null,null)了:物件到欄位反序列化的問題。

from:http://www.winseliu.com/blog/2016/10/12/sparksql-view-and-debug-generatecode/