Spark程式進行單元測試-使用scala
阿新 • • 發佈:2018-12-25
Spark 中進行一些單元測試技巧:
最近剛寫了一點Spark上的單元測試,大概整理了一些
rdd測試
spark程式一般從叢集中讀取資料然後通過rdd進行轉換,這其中涉及到叢集,每次修改bug,上傳到叢集再執行測試,代價還是挺大;所以儘可能先本地進行單元測試,以減少在叢集上執行時錯誤,特別是map等各種tranforms動作的邏輯錯誤;以下示例用於測試本地返回rdd相關的方法(利用spark本地模式進行單元測試)
Tips:
//定義一個簡單的wordcount
object WordCount extends Serializable{
def count(lines:RDD[String]): RDD[(String,Int)]={
val rdd=lines.flatMap(line=>line.split("\\s")).map(word=>(word,1)).reduceByKey(_ + _)
rdd
}
}
//引入scalatest建立一個單元測試類,混入特質BeforeAndAfter,在before和after中分別初始化sc和停止sc,
//初始化SparkContext時只需將Master設定為local(local[N],N表示執行緒)即可,無需本地配置或搭建叢集,
class WordCountTests extends FlatSpec with BeforeAndAfter{
val master="local" //sparkcontext的執行master
var sc:SparkContext=_
it should("test success") in{
//其中引數為rdd或者dataframe可以通過通過簡單的手動構造即可
val seq=Seq("the test test1","the test","the")
val rdd=sc.parallelize(seq)
val wordCounts=WordCount.count(rdd)
wordCounts.map(p=>{
p._1 match {
case "the"=>
assert(p._2==3)
case "test"=>
assert(p._2==2)
case "test1"=>
assert(p._2==1)
case _=>
None
}
}).foreach(_=>())
}
//這裡before和after中分別進行sparkcontext的初始化和結束,如果是SQLContext也可以在這裡面初始化
before{
val conf=new SparkConf()
.setAppName("test").setMaster(master)
sc=new SparkContext(conf)
}
after{
if(sc!=null){
sc.stop()
}
}
}
無返回值方法測試
有時候一個方法起到一個呼叫流程的作用,最後可能是輸出或者寫入某個檔案而沒有返回值,一般單元測試可能是檢視最後有沒有輸出檔案,但是ide在本地可能不太好進行測試
例如:
trait WriterHandle{
def writer(records:Seq[GenericRecord]):Unit={
val parquetWriter=...
records.foreach(parquetWriter.writer(..))
}
}
//一個類處混入這個特質,經過一定轉換後將結果資料寫入parquet中
class ProcessHandle(objects:Iterator[T]) extends Serializable with WriterHandle{
def process():Unit={
val records:Seq[GenericRecord]=build(objects)={
...
}
//這裡呼叫了特質writer中的writer方法,實際單元測試執行到這裡可能寫入的時候會出錯,不能正常測試通過
writer(records)
}
}
class Writertests extends FlatSpec {
it should("write success") in{
val objects=Seq(object1,object2..).toIterator
//在new處理類,混入原先特質的一個子特質
val process=new ProcessHandle(objects) with Writerhandletest
}
}
//可以自定義一個trait繼承自原先的特質,通過將原先的方法覆蓋,然後在重寫後的方法裡面的根據傳入值定義所需要斷言即可
trait Writerhandletest extends WriterHandle{
override def writer(records:Seq[GenericRecord]):Unit={
assert(records.length==N)
assert(records(0).XX=="xxx")
}
}
如有必要也可以測試下私有方法:
理論上來說,私有方法都會被公有方法呼叫,呼叫公有方法也可以驗證私有方法,不過如果公有方法不方便測試也可以對某個私有方法進行測試,就看是否有必要
可以測試如下:
class MyTest(s:String){
//此公有方法可能不方便測試
def ():Unit={
...
doSth(s)
}
//這裡私有方法,可能是邏輯關鍵所在,有必要測試
private def doSth(s:String):String={
...
}
}
編寫單元測試
//要混入PrivateMethodTester特質
class MytestTests extends FlatSpec with PrivateMethodTester{
it should("write success") in{
//首先new一個要測試的類
val myTest=new MyTest("string")
//其中通過PrivateMethod修飾,[]中為返回值, ('method)單引號後跟私有方法名
val dosth=PrivateMethod[String]('doSth)
//通過invokePrivate 委託呼叫私有方法,注意引數要對,貌似傳null的話會找不到對應的方法
val str=myTest invokePrivate dosth("string")
//最後斷言相應的至即可
asset(str=="string")
}
}