1. 程式人生 > 其它 >Spark-Scala單元測試實踐

Spark-Scala單元測試實踐

單元測試框架搭建

使用工具與框架如下:

  • scala-test_2.1.1 (3.0.0) - 基本框架

  • mockito-scala_2.11 (1.16.37) - MOCK框架

  • spark-fast-tests_2.11 (0.23.0) - 斷言(比較DataFrame)

  • scalatest-maven-plugin - maven外掛, 可執行test

POM檔案示例

</dependencies>
	<dependency>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest_2.11</artifactId>
        <version>3.0.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.github.mrpowers</groupId>
        <artifactId>spark-fast-tests_2.11</artifactId>
        <version>0.23.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.mockito</groupId>
        <artifactId>mockito-scala_2.11</artifactId>
        <version>1.16.37</version>
        <scope>test</scope>
    </dependency>
</dependencies>
<build>
	</plugins>
		<plugin>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest-maven-plugin</artifactId>
            <version>1.0</version>
            <configuration>
                <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
                <junitxml>.</junitxml>
                <filereports>WDF TestSuite.txt</filereports>
            </configuration>
            <executions>
                <execution>
                    <id>test</id>
                    <goals>
                        <goal>test</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
    <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
</build>

雖然需要的jar包maven已經全部配置了, 但是打包的單元測試過程, Spark初始化報錯。 如下

org.apache.maven.surefire.util.SurefireReflectionException: java.lang.reflect.InvocationTargetException; nested exception is java.lang.reflect.InvocationTargetException: null
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/Module
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
        at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:200)
        at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:196)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
        at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:196)
        at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:104)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:514)
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.Module
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

原因是jar包產生了衝突, 需要顯示引入jar包, pom檔案新增依賴如下:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.6.7</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.6.7</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>2.6.7</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-scala_2.11</artifactId>
    <version>2.6.7.1</version>
</dependency>

Scala-test提供了七種測試風格,分別為:FunSuite,FlatSpec,FunSpec,WordSpec,FreeSpec,PropSpec和FeatureSpec。

專案中選擇了FunSuite這種風格, 更加靈活以及符合傳統測試方法的風格。

示例

測試下面類的一個UDF

class Clue(spark: SparkSession, ds: String) extends TDW with Serializable {
	// 去除標點符號和空格, 僅保留中文英文數字
    val removeSymbolUdf: UserDefinedFunction = udf((title: String) => {
        val pattern = "[\\u4e00-\\u9fa5a-zA-Z0-9]+".r
        val matchess = pattern.findAllMatchIn(title)
        val buffer = new StringBuilder()
        matchess.foreach(matches => buffer.append(matches.group(0)))
        buffer.toString()
    })
}

Spark測試初始化的類

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession
      .builder()
      .master("local[*]")
      .appName("spark test example")
      .config("spark.driver.bindAddress","127.0.0.1")
      .getOrCreate()
  }

}

單元測試類。

class ClueTest extends FunSuite with SparkSessionTestWrapper with MockitoSugar 
               with DataFrameComparer with Serializable {
	private val obj = new Clue(null, "20210806") {
        override def save(spark: SparkSession, table: String, df: DataFrame, impDate: String): Unit = {}
    }
    test("testRemoveSymbolUdf") {
        import spark.implicits._

        val sourceDF = Seq(
            "1234這裡this是is一個a測%試test案$例case     ",
            "這裡是。.,;*&?*;&……一個1234測試案例"
        ).toDF("title")

        val actualDF = sourceDF.withColumn("title", obj.removeSymbolUdf(col("title")))

        val expectedDF = Seq(
            "1234這裡this是is一個a測試test案例case",
            "這裡是一個1234測試案例"
        ).toDF("title")

        // assertSmallDataFrameEquality是包spark-fast-tests下DataFrameComparer的斷言方法, 可用於比較DataFrame
        assertSmallDataFrameEquality(actualDF, expectedDF)
    }
}

執行結果

上面沒有報錯就是執行成功了。

maven打包時, 會執行所有測試, 如下:

MOCK框架

mock框架是使用的mockito-scala。

部分MOCK

mock[Class]會將一個class的所有方法全部mock掉, 返回mockito的各型別的預設值。如果要mock所有方法, 然後在少量方法裡呼叫真實的方法實現, 可以這麼寫:

class TestMock {
	def doSomethingA(): String = "doSomethingA not mocked"
    def doSomethingB(): String = "doSomethingB not mocked"	
}

class TestMockSuite1 extends FunSuite with MockitoSugar {
    test("testMock") {
        val m = mock[TestMock]
        doCallRealMethod.when(m).doSomethingA()
        println(m.doSomethingA()) // 列印 doSomethingA not mocked
        println(m.doSomethingB()) // 列印空字串
    }
}

還有一種Spy方法, 對於沒有mock的部分呼叫真實的方法, 被mock部分呼叫mock後的方法

class TestMock {
	def doSomethingA(): String = "doSomethingA not mocked"
    def doSomethingB(): String = "doSomethingB not mocked"	
}

class TestMockSuite2 extends FunSuite with MockitoSugar {
    test("testMock") {
        val m = spy(new TestMock())
        println(m.doSomethingA()) // 列印 doSomethingA not mocked
        println(m.doSomethingB()) // 列印 doSomethingB not mocked
        when(m.doSomethingB()).thenReturn("mocked") // 這種方式實際上還是會呼叫真實的方法, 只是偷換了返回值
        // doReturn("mocked!").when(m).doSomethingB() //這種寫法才能避免真實方法的呼叫
        println(m.doSomethingA()) // 列印 doSomethingA not mocked
        println(m.doSomethingB()) // 列印 mocked
        
    }
}

Mock Class Methods Extended Trait

對於這種型別的MOCK不能直接mock trait, 一種可行的方式生成一個子類, 將方法重寫。

示例

trait A {
	def doSomethingA(): String = "not mocked, invoke trait A.doSomethingA"
}

class B extends A {
    
}

如果採用下面的方式mock, 是無法mock成功的。

val a = mock[A]
when(a.doSomethingA()).thenReturn("mocked")
val b = new B()
println(b.doSomethingA()) // 列印值: not mocked, invoke trait A.doSomethingA

正確的mock方法是

val obj = new B {
    override def doSomethingA(): String = "mocked"
}
val t = Mockito.mock(obj.getClass, Mockito.withSettings()
                     .spiedInstance(obj).defaultAnswer(CALLS_REAL_METHODS))

Mock Scala Object

Object支援inline mock。需要這個被mock的object的測試程式碼只能寫在withObjectMocked[Class]的大括號裡面。如下

class D {
    def value(): String = "D_value"
}
class DMock extends D {
    override def value(): String = "mocked"
}
objetct D {
	def apply(): D = new D()
}
withObjectMocked[D.type] {
    when(D.apply()).thenReturn(new DMock())
    // 測試程式碼
}

當MOCK一個Object的時候, 會報下面一個錯誤。

org.mockito.exceptions.base.MockitoException: 
Cannot mock/spy class com.pkslow.basic.MockitoTest$FinalPumpkin
Mockito cannot mock/spy because :
 - final class

quote from: How to mock scala obejct #303 https://github.com/mockito/mockito-scala/issues/303

https://github.com/mockito/mockito/wiki/What's-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods

解決這個問題的方法如下圖, 在src/test/resources/mockito-extensions下新增一個檔案org.mockito.plugins.MockMaker, 內容為mock-maker-inline

但是打包的時候會出現

Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: Could not initialize plugin: interface org.mockito.plugins.MockMaker
	at com.dummy.feed.query.TestUtils.getRequestData(TestUtils.java:879)
	at com.dummy.feed.query.resultsetfilters.TestResultSetFiltersManager.<init>(TestResultSetFiltersManager.java:63)
	... 53 more
Caused by: java.lang.IllegalStateException: Could not initialize plugin: interface org.mockito.plugins.MockMaker
	at org.mockito.internal.configuration.plugins.PluginLoader$1.invoke(PluginLoader.java:66)
	at com.sun.proxy.$Proxy10.isTypeMockable(Unknown Source)
	at org.mockito.internal.util.MockUtil.typeMockabilityOf(MockUtil.java:29)
	at org.mockito.internal.util.MockCreationValidator.validateType(MockCreationValidator.java:22)
	at org.mockito.internal.creation.MockSettingsImpl.validatedSettings(MockSettingsImpl.java:186)
	at org.mockito.internal.creation.MockSettingsImpl.confirm(MockSettingsImpl.java:180)
	at org.mockito.internal.MockitoCore.mock(MockitoCore.java:62)
	at org.mockito.Mockito.mock(Mockito.java:1729)
	at org.mockito.Mockito.mock(Mockito.java:1642)
...
Caused by: java.lang.IllegalStateException: Error during attachment using: net.bytebuddy.agent.ByteBuddyAgent$AttachmentProvider$Compound@a5092e63
	at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:379)
	at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:353)
	at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:321)
	at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:307)
	at org.mockito.internal.creation.bytebuddy.InlineByteBuddyMockMaker.<clinit>(InlineByteBuddyMockMaker.java:102)

quote from: https://stackoverflow.com/questions/49767683/what-does-no-compatible-attachment-provider-is-available-mean

After some debugging I found the problem. The message "No compatible attachment provider is not available" occurs if the agent was called with a jre instead of a jdk.

Unfortunately calling java -version does not return whether java is a jdk or a jre (the message is displaying Java runtime Environment for both).

In my case (OS:Windows) it was tricky, because newer jsdk-installations attach C:\ProgramData\Oracle\Java\javapath to the system path, which contains a jre and not a jdk. The formerly added entry %JAVA_HOME%/bin got hidden through this modification. When I removed the entry C:\ProgramData\Oracle\Java\javapath everything worked fine.

ByteBuddyAgent.install方法在JDK的去呼叫的時候是沒有問題的。但是當JRE去呼叫它的時候就會出現這個問題。現在新版的JDK安裝會安裝JRE, 優先順序高於JDK自身的JRE。所以解決辦法是解除安裝JRE。(JDK安裝的時候是安裝了兩個JRE的, 一個JDK裡面的, 一個公共的JRE, 解除安裝掉公共的JRE是沒有問題的)

MOCK new 物件

如果要mock下面的程式碼的方法裡的new出來的物件

class D {
    def value(): String = "D_value"
}
class TestMock {
    def testNewObject(): String = new D().value()
}
class TestMockSuiteD extends FunSuite with MockitoSugar {
    test("testMock") {
        val d = mock[D]
        when(d.value()).thenReturn("mocked") // 這裡mock是不會對方法呼叫裡面的new物件起作用的
        val m = spy(new TestMock())
        println(m.testNewObject()) // 列印 D_value
    }
}

new物件的問題在ScalaTest + mockito-scala下無解。沒有可以解決的方法, 在一門面向物件的程式語言裡非常之坑。在mock new物件的時候, 可以將new物件的操作放在工廠方法裡, 這樣就能通過mock object來mock new物件的問題了。

class D {
    def value(): String = "D_value"
}
class DMock extends D {
    override def value(): String = "mocked"
}
objetct D {
	def apply(): D = new D()
}
class TestMock {
    def testNewObject(): String = D().value()
}
class TestMockSuiteD extends FunSuite with MockitoSugar {
    test("testMock") {
        withObjectMocked[D.type] {
            when(D.apply()).thenReturn(new DMock())
            val m = spy(new TestMock())
            println(m.testNewObject()) // 列印mocked
        }

    }
}

序列化問題

在寫單元測試的時候, 會經常遇到

Task not serializable
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
...
Caused by: java.io.NotSerializableException: org.mockito.internal.creation.DelegatingMethod
Serialization stack:
	- object not serializable (class: org.mockito.internal.creation.DelegatingMethod, value: org.mockito.internal.creation.DelegatingMethod@a97f2bff)
	- field (class: org.mockito.internal.invocation.ScalaInvocation, name: mockitoMethod, type: interface org.mockito.internal.invocation.MockitoMethod)
	- object (class org.mockito.internal.invocation.ScalaInvocation, clue.toString();)
	- field (class: org.mockito.internal.invocation.InvocationMatcher, name: invocation, type: interface org.mockito.invocation.Invocation)
	- object (class org.mockito.internal.invocation.InvocationMatcher, clue.toString();)

提示不可序列化。

我們自己建立的類是已經實現了Serializable介面的。那為什麼還會出現序列化問題呢?是因為spy(new A())的原因。spy之後的物件是mockito幫我們建立的, 我們mock的它裡面呼叫了一些不能序列化的東西(比如上面報錯的DelegatingMethod)。導致出現了這種錯誤。

對於一個不熟悉mockito原始碼的使用者來說, 第一解決方案是Google。

quote from: https://stackoverflow.com/questions/53820877/spark-scala-mocking-task-not-serializable

Mocks are not serialisable by default, as it's usually a code smell in unit testing

You can try enabling serialisation by creating the mock like mock[MyType](Mockito.withSettings().serializable()) and see what happens when spark tries to use it.

BTW, I recommend you to use mockito-scala instead of the traditional mockito as it may save you some other problems

Spying in mockito-scala works exactly the same as regular mockito... What you can try is a mock with delegate witch is how spyLambda works, i.e. Mockito.mock(classOf[MyType], Mockito.withSettings().serializable().defaultAnswer(AdditionalAnswers.delegatesTo(new MyScalaClass(<some args>))))

上面的第一個選項是mock並不是spy, 並不能滿足需求。於是照著上面的第二個方案修改為

val t = Mockito.mock(classOf[Clue], Mockito.withSettings().serializable().defaultAnswer(AdditionalAnswers.delegatesTo(new Clue())))

發現spy物件mock的方法直接呼叫了原方法, 並不能mock住。

沒有辦法繼續Google...

quote from: https://github.com/mockito/mockito/issues/537

I think that if the subject of spy or mock method implements Serializable, then Mockito should return serializable instance with no exceptions and current behavior should be treated as a bug!

While this:

mock(class,
withSettings()
.serializable()
.spiedInstance(object)
.defaultAnswer(CALLS_REAL_METHODS));

indeed works, it should be treated only as a workaround until the framework fixes it's behavior.

照著這個選項進行了如下的修改

val t = Mockito.mock(classOf[Clue], Mockito.withSettings().serializable()
          .spiedInstance(new Clue()).defaultAnswer(CALLS_REAL_METHODS))

當然CALLS_REAL_METHODS是飄紅的, 於是嘗試著自己寫一個Answer。不過最後還是報錯。

最後忽然意思靈光想到CALLS_REAL_METHODS這個應該是Mockito裡自帶有的,於是下了Mockito原始碼來看。在裡面找到了import的語句。

import org.mockito.Answers.CALLS_REAL_METHODS

加上這一行過後。不飄紅了, 能夠正確匯入了。但是又報了一個新的錯誤。

2021-08-07 00:07:30,232 - Exception in task 6.0 in stage 17.0 (TID 1437)
java.io.InvalidClassException: java.lang.Void; local class name incompatible with stream class name "void"
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:703)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1939)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1805)
	at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1770)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1595)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)

繼續使用Google大法。。。。

quote from: https://stackoverflow.com/questions/56579623/mock-with-void-method-causes-local-class-name-incompatible-with-stream-class-na

The problem seems to be in the way that Mockito serializes its internal proxy classes. That only has a negative effect if the tasks / jobs you run within spark actually get serialized and deserialized.

In org.apache.spark.scheduler.ShuffleMapTask#runTask the task is deserialized. What Spark basically does at that point is:

new JavaDeserializationStream(new ByteBufferInputStream(ByteBuffer.wrap(this.taskBinary.value())), ClassLoader.getSystemClassLoader()).objIn.readObject()

which produces the exact error message vs.

new ObjectInputStream(new ByteArrayInputStream(this.taskBinary.value())).readObject()

which would work and parse the object properly.

In particular there seems to be a mismatch between how Java / Spark expects void methods to be serialized vs. what Mockito actually does: "java.lang.Void" / "Void" vs. "void".

Luckily Mockito lets you specify the way it serializes its mocks:

MockSettings mockSettings = Mockito.withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS);
writer = mock(Writer.class, mockSettings);

After this change the test should work.

修改為

val t = Mockito.mock(classOf[Clue], Mockito.withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS)
          .spiedInstance(obj).defaultAnswer(CALLS_REAL_METHODS))

終於跑通了。

參考

https://medium.com/@bruno.bonanno/introduction-to-mockito-scala-ede30769cbda
https://mrpowers.medium.com/testing-spark-applications-8c590d3215fa
http://agiledon.github.io/blog/2014/01/13/testing-styles-of-scalatest/
https://stackoverflow.com/questions/32109910/how-to-mock-method-in-extended-trait-with-mockito