Spark-Scala單元測試實踐
單元測試框架搭建
使用工具與框架如下:
-
scala-test_2.1.1 (3.0.0) - 基本框架
-
mockito-scala_2.11 (1.16.37) - MOCK框架
-
spark-fast-tests_2.11 (0.23.0) - 斷言(比較DataFrame)
-
scalatest-maven-plugin - maven外掛, 可執行test
POM檔案示例
</dependencies> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.11</artifactId> <version>3.0.0</version> <scope>test</scope> </dependency> <dependency> <groupId>com.github.mrpowers</groupId> <artifactId>spark-fast-tests_2.11</artifactId> <version>0.23.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-scala_2.11</artifactId> <version>1.16.37</version> <scope>test</scope> </dependency> </dependencies> <build> </plugins> <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> <version>1.0</version> <configuration> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> <junitxml>.</junitxml> <filereports>WDF TestSuite.txt</filereports> </configuration> <executions> <execution> <id>test</id> <goals> <goal>test</goal> </goals> </execution> </executions> </plugin> </plugins> <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> </build>
雖然需要的jar包maven已經全部配置了, 但是打包的單元測試過程, Spark初始化報錯。 如下
org.apache.maven.surefire.util.SurefireReflectionException: java.lang.reflect.InvocationTargetException; nested exception is java.lang.reflect.InvocationTargetException: null java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189) at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165) at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75) Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/Module at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.Utils$.classForName(Utils.scala:238) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:200) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:196) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:196) at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:104) at org.apache.spark.SparkContext.<init>(SparkContext.scala:514) Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.Module at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
原因是jar包產生了衝突, 需要顯示引入jar包, pom檔案新增依賴如下:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.6.7</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.6.7</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.6.7</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.6.7.1</version> </dependency>
Scala-test提供了七種測試風格,分別為:FunSuite,FlatSpec,FunSpec,WordSpec,FreeSpec,PropSpec和FeatureSpec。
專案中選擇了FunSuite這種風格, 更加靈活以及符合傳統測試方法的風格。
示例
測試下面類的一個UDF
class Clue(spark: SparkSession, ds: String) extends TDW with Serializable {
// 去除標點符號和空格, 僅保留中文英文數字
val removeSymbolUdf: UserDefinedFunction = udf((title: String) => {
val pattern = "[\\u4e00-\\u9fa5a-zA-Z0-9]+".r
val matchess = pattern.findAllMatchIn(title)
val buffer = new StringBuilder()
matchess.foreach(matches => buffer.append(matches.group(0)))
buffer.toString()
})
}
Spark測試初始化的類
trait SparkSessionTestWrapper {
lazy val spark: SparkSession = {
SparkSession
.builder()
.master("local[*]")
.appName("spark test example")
.config("spark.driver.bindAddress","127.0.0.1")
.getOrCreate()
}
}
單元測試類。
class ClueTest extends FunSuite with SparkSessionTestWrapper with MockitoSugar
with DataFrameComparer with Serializable {
private val obj = new Clue(null, "20210806") {
override def save(spark: SparkSession, table: String, df: DataFrame, impDate: String): Unit = {}
}
test("testRemoveSymbolUdf") {
import spark.implicits._
val sourceDF = Seq(
"1234這裡this是is一個a測%試test案$例case ",
"這裡是。.,;*&?*;&……一個1234測試案例"
).toDF("title")
val actualDF = sourceDF.withColumn("title", obj.removeSymbolUdf(col("title")))
val expectedDF = Seq(
"1234這裡this是is一個a測試test案例case",
"這裡是一個1234測試案例"
).toDF("title")
// assertSmallDataFrameEquality是包spark-fast-tests下DataFrameComparer的斷言方法, 可用於比較DataFrame
assertSmallDataFrameEquality(actualDF, expectedDF)
}
}
執行結果
上面沒有報錯就是執行成功了。
maven打包時, 會執行所有測試, 如下:
MOCK框架
mock框架是使用的mockito-scala。
部分MOCK
mock[Class]會將一個class的所有方法全部mock掉, 返回mockito的各型別的預設值。如果要mock所有方法, 然後在少量方法裡呼叫真實的方法實現, 可以這麼寫:
class TestMock {
def doSomethingA(): String = "doSomethingA not mocked"
def doSomethingB(): String = "doSomethingB not mocked"
}
class TestMockSuite1 extends FunSuite with MockitoSugar {
test("testMock") {
val m = mock[TestMock]
doCallRealMethod.when(m).doSomethingA()
println(m.doSomethingA()) // 列印 doSomethingA not mocked
println(m.doSomethingB()) // 列印空字串
}
}
還有一種Spy方法, 對於沒有mock的部分呼叫真實的方法, 被mock部分呼叫mock後的方法
class TestMock {
def doSomethingA(): String = "doSomethingA not mocked"
def doSomethingB(): String = "doSomethingB not mocked"
}
class TestMockSuite2 extends FunSuite with MockitoSugar {
test("testMock") {
val m = spy(new TestMock())
println(m.doSomethingA()) // 列印 doSomethingA not mocked
println(m.doSomethingB()) // 列印 doSomethingB not mocked
when(m.doSomethingB()).thenReturn("mocked") // 這種方式實際上還是會呼叫真實的方法, 只是偷換了返回值
// doReturn("mocked!").when(m).doSomethingB() //這種寫法才能避免真實方法的呼叫
println(m.doSomethingA()) // 列印 doSomethingA not mocked
println(m.doSomethingB()) // 列印 mocked
}
}
Mock Class Methods Extended Trait
對於這種型別的MOCK不能直接mock trait, 一種可行的方式生成一個子類, 將方法重寫。
示例
trait A {
def doSomethingA(): String = "not mocked, invoke trait A.doSomethingA"
}
class B extends A {
}
如果採用下面的方式mock, 是無法mock成功的。
val a = mock[A]
when(a.doSomethingA()).thenReturn("mocked")
val b = new B()
println(b.doSomethingA()) // 列印值: not mocked, invoke trait A.doSomethingA
正確的mock方法是
val obj = new B {
override def doSomethingA(): String = "mocked"
}
val t = Mockito.mock(obj.getClass, Mockito.withSettings()
.spiedInstance(obj).defaultAnswer(CALLS_REAL_METHODS))
Mock Scala Object
Object支援inline mock。需要這個被mock的object的測試程式碼只能寫在withObjectMocked[Class]的大括號裡面。如下
class D {
def value(): String = "D_value"
}
class DMock extends D {
override def value(): String = "mocked"
}
objetct D {
def apply(): D = new D()
}
withObjectMocked[D.type] {
when(D.apply()).thenReturn(new DMock())
// 測試程式碼
}
當MOCK一個Object的時候, 會報下面一個錯誤。
org.mockito.exceptions.base.MockitoException:
Cannot mock/spy class com.pkslow.basic.MockitoTest$FinalPumpkin
Mockito cannot mock/spy because :
- final class
quote from: How to mock scala obejct #303 https://github.com/mockito/mockito-scala/issues/303
解決這個問題的方法如下圖, 在src/test/resources/mockito-extensions下新增一個檔案org.mockito.plugins.MockMaker, 內容為mock-maker-inline
但是打包的時候會出現
Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: Could not initialize plugin: interface org.mockito.plugins.MockMaker
at com.dummy.feed.query.TestUtils.getRequestData(TestUtils.java:879)
at com.dummy.feed.query.resultsetfilters.TestResultSetFiltersManager.<init>(TestResultSetFiltersManager.java:63)
... 53 more
Caused by: java.lang.IllegalStateException: Could not initialize plugin: interface org.mockito.plugins.MockMaker
at org.mockito.internal.configuration.plugins.PluginLoader$1.invoke(PluginLoader.java:66)
at com.sun.proxy.$Proxy10.isTypeMockable(Unknown Source)
at org.mockito.internal.util.MockUtil.typeMockabilityOf(MockUtil.java:29)
at org.mockito.internal.util.MockCreationValidator.validateType(MockCreationValidator.java:22)
at org.mockito.internal.creation.MockSettingsImpl.validatedSettings(MockSettingsImpl.java:186)
at org.mockito.internal.creation.MockSettingsImpl.confirm(MockSettingsImpl.java:180)
at org.mockito.internal.MockitoCore.mock(MockitoCore.java:62)
at org.mockito.Mockito.mock(Mockito.java:1729)
at org.mockito.Mockito.mock(Mockito.java:1642)
...
Caused by: java.lang.IllegalStateException: Error during attachment using: net.bytebuddy.agent.ByteBuddyAgent$AttachmentProvider$Compound@a5092e63
at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:379)
at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:353)
at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:321)
at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:307)
at org.mockito.internal.creation.bytebuddy.InlineByteBuddyMockMaker.<clinit>(InlineByteBuddyMockMaker.java:102)
quote from: https://stackoverflow.com/questions/49767683/what-does-no-compatible-attachment-provider-is-available-mean
After some debugging I found the problem. The message "No compatible attachment provider is not available" occurs if the agent was called with a jre instead of a jdk.
Unfortunately calling
java -version
does not return whetherjava
is a jdk or a jre (the message is displayingJava runtime Environment
for both).In my case (OS:Windows) it was tricky, because newer jsdk-installations attach
C:\ProgramData\Oracle\Java\javapath
to the system path, which contains a jre and not a jdk. The formerly added entry%JAVA_HOME%/bin
got hidden through this modification. When I removed the entryC:\ProgramData\Oracle\Java\javapath
everything worked fine.
ByteBuddyAgent.install方法在JDK的去呼叫的時候是沒有問題的。但是當JRE去呼叫它的時候就會出現這個問題。現在新版的JDK安裝會安裝JRE, 優先順序高於JDK自身的JRE。所以解決辦法是解除安裝JRE。(JDK安裝的時候是安裝了兩個JRE的, 一個JDK裡面的, 一個公共的JRE, 解除安裝掉公共的JRE是沒有問題的)
MOCK new 物件
如果要mock下面的程式碼的方法裡的new出來的物件
class D {
def value(): String = "D_value"
}
class TestMock {
def testNewObject(): String = new D().value()
}
class TestMockSuiteD extends FunSuite with MockitoSugar {
test("testMock") {
val d = mock[D]
when(d.value()).thenReturn("mocked") // 這裡mock是不會對方法呼叫裡面的new物件起作用的
val m = spy(new TestMock())
println(m.testNewObject()) // 列印 D_value
}
}
new物件的問題在ScalaTest + mockito-scala下無解。沒有可以解決的方法, 在一門面向物件的程式語言裡非常之坑。在mock new物件的時候, 可以將new物件的操作放在工廠方法裡, 這樣就能通過mock object來mock new物件的問題了。
class D {
def value(): String = "D_value"
}
class DMock extends D {
override def value(): String = "mocked"
}
objetct D {
def apply(): D = new D()
}
class TestMock {
def testNewObject(): String = D().value()
}
class TestMockSuiteD extends FunSuite with MockitoSugar {
test("testMock") {
withObjectMocked[D.type] {
when(D.apply()).thenReturn(new DMock())
val m = spy(new TestMock())
println(m.testNewObject()) // 列印mocked
}
}
}
序列化問題
在寫單元測試的時候, 會經常遇到
Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:406)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
...
Caused by: java.io.NotSerializableException: org.mockito.internal.creation.DelegatingMethod
Serialization stack:
- object not serializable (class: org.mockito.internal.creation.DelegatingMethod, value: org.mockito.internal.creation.DelegatingMethod@a97f2bff)
- field (class: org.mockito.internal.invocation.ScalaInvocation, name: mockitoMethod, type: interface org.mockito.internal.invocation.MockitoMethod)
- object (class org.mockito.internal.invocation.ScalaInvocation, clue.toString();)
- field (class: org.mockito.internal.invocation.InvocationMatcher, name: invocation, type: interface org.mockito.invocation.Invocation)
- object (class org.mockito.internal.invocation.InvocationMatcher, clue.toString();)
提示不可序列化。
我們自己建立的類是已經實現了Serializable介面的。那為什麼還會出現序列化問題呢?是因為spy(new A())的原因。spy之後的物件是mockito幫我們建立的, 我們mock的它裡面呼叫了一些不能序列化的東西(比如上面報錯的DelegatingMethod)。導致出現了這種錯誤。
對於一個不熟悉mockito原始碼的使用者來說, 第一解決方案是Google。
quote from: https://stackoverflow.com/questions/53820877/spark-scala-mocking-task-not-serializable
Mocks are not serialisable by default, as it's usually a code smell in unit testing
You can try enabling serialisation by creating the mock like
mock[MyType](Mockito.withSettings().serializable())
and see what happens when spark tries to use it.BTW, I recommend you to use mockito-scala instead of the traditional mockito as it may save you some other problems
Spying in mockito-scala works exactly the same as regular mockito... What you can try is a mock with delegate witch is how spyLambda works, i.e.
Mockito.mock(classOf[MyType], Mockito.withSettings().serializable().defaultAnswer(AdditionalAnswers.delegatesTo(new MyScalaClass(<some args>))))
上面的第一個選項是mock並不是spy, 並不能滿足需求。於是照著上面的第二個方案修改為
val t = Mockito.mock(classOf[Clue], Mockito.withSettings().serializable().defaultAnswer(AdditionalAnswers.delegatesTo(new Clue())))
發現spy物件mock的方法直接呼叫了原方法, 並不能mock住。
沒有辦法繼續Google...
quote from: https://github.com/mockito/mockito/issues/537
I think that if the subject of
spy
ormock
method implementsSerializable
, then Mockito should return serializable instance with no exceptions and current behavior should be treated as a bug!While this:
mock(class, withSettings() .serializable() .spiedInstance(object) .defaultAnswer(CALLS_REAL_METHODS));
indeed works, it should be treated only as a workaround until the framework fixes it's behavior.
照著這個選項進行了如下的修改
val t = Mockito.mock(classOf[Clue], Mockito.withSettings().serializable()
.spiedInstance(new Clue()).defaultAnswer(CALLS_REAL_METHODS))
當然CALLS_REAL_METHODS是飄紅的, 於是嘗試著自己寫一個Answer。不過最後還是報錯。
最後忽然意思靈光想到CALLS_REAL_METHODS這個應該是Mockito裡自帶有的,於是下了Mockito原始碼來看。在裡面找到了import的語句。
import org.mockito.Answers.CALLS_REAL_METHODS
加上這一行過後。不飄紅了, 能夠正確匯入了。但是又報了一個新的錯誤。
2021-08-07 00:07:30,232 - Exception in task 6.0 in stage 17.0 (TID 1437)
java.io.InvalidClassException: java.lang.Void; local class name incompatible with stream class name "void"
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:703)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1939)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1805)
at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1595)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
繼續使用Google大法。。。。
The problem seems to be in the way that Mockito serializes its internal proxy classes. That only has a negative effect if the tasks / jobs you run within spark actually get serialized and deserialized.
In
org.apache.spark.scheduler.ShuffleMapTask#runTask
the task is deserialized. What Spark basically does at that point is:new JavaDeserializationStream(new ByteBufferInputStream(ByteBuffer.wrap(this.taskBinary.value())), ClassLoader.getSystemClassLoader()).objIn.readObject()
which produces the exact error message vs.
new ObjectInputStream(new ByteArrayInputStream(this.taskBinary.value())).readObject()
which would work and parse the object properly.
In particular there seems to be a mismatch between how Java / Spark expects
void
methods to be serialized vs. what Mockito actually does:"java.lang.Void"
/"Void"
vs."void"
.Luckily Mockito lets you specify the way it serializes its mocks:
MockSettings mockSettings = Mockito.withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS); writer = mock(Writer.class, mockSettings);
After this change the test should work.
修改為
val t = Mockito.mock(classOf[Clue], Mockito.withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS)
.spiedInstance(obj).defaultAnswer(CALLS_REAL_METHODS))
終於跑通了。
參考
https://medium.com/@bruno.bonanno/introduction-to-mockito-scala-ede30769cbda
https://mrpowers.medium.com/testing-spark-applications-8c590d3215fa
http://agiledon.github.io/blog/2014/01/13/testing-styles-of-scalatest/
https://stackoverflow.com/questions/32109910/how-to-mock-method-in-extended-trait-with-mockito