1. 程式人生 > >Spark程式進行單元測試-使用scala

Spark程式進行單元測試-使用scala

Spark 中進行一些單元測試技巧:

最近剛寫了一點Spark上的單元測試,大概整理了一些

rdd測試


spark程式一般從叢集中讀取資料然後通過rdd進行轉換,這其中涉及到叢集,每次修改bug,上傳到叢集再執行測試,代價還是挺大;所以儘可能先本地進行單元測試,以減少在叢集上執行時錯誤,特別是map等各種tranforms動作的邏輯錯誤;以下示例用於測試本地返回rdd相關的方法(利用spark本地模式進行單元測試)
Tips:
這裡寫圖片描述

//定義一個簡單的wordcount
object WordCount extends Serializable{ 
  def count(lines:RDD[String]): RDD[(String,Int)]={
    val
rdd=lines.flatMap(line=>line.split("\\s")).map(word=>(word,1)).reduceByKey(_ + _) rdd } }
//引入scalatest建立一個單元測試類,混入特質BeforeAndAfter,在before和after中分別初始化sc和停止sc,
//初始化SparkContext時只需將Master設定為local(local[N],N表示執行緒)即可,無需本地配置或搭建叢集,

class WordCountTests extends FlatSpec with BeforeAndAfter{
  val
master="local" //sparkcontext的執行master var sc:SparkContext=_ it should("test success") in{ //其中引數為rdd或者dataframe可以通過通過簡單的手動構造即可 val seq=Seq("the test test1","the test","the") val rdd=sc.parallelize(seq) val wordCounts=WordCount.count(rdd) wordCounts.map(p=>{ p._1 match
{ case "the"=> assert(p._2==3) case "test"=> assert(p._2==2) case "test1"=> assert(p._2==1) case _=> None } }).foreach(_=>()) } //這裡before和after中分別進行sparkcontext的初始化和結束,如果是SQLContext也可以在這裡面初始化 before{ val conf=new SparkConf() .setAppName("test").setMaster(master) sc=new SparkContext(conf) } after{ if(sc!=null){ sc.stop() } } }

無返回值方法測試


有時候一個方法起到一個呼叫流程的作用,最後可能是輸出或者寫入某個檔案而沒有返回值,一般單元測試可能是檢視最後有沒有輸出檔案,但是ide在本地可能不太好進行測試
例如:

trait WriterHandle{
   def writer(records:Seq[GenericRecord]):Unit={
     val parquetWriter=...
     records.foreach(parquetWriter.writer(..)) 
   }
}
//一個類處混入這個特質,經過一定轉換後將結果資料寫入parquet中
class ProcessHandle(objects:Iterator[T]) extends Serializable with WriterHandle{
  def process():Unit={
     val records:Seq[GenericRecord]=build(objects)={
        ...
     }
     //這裡呼叫了特質writer中的writer方法,實際單元測試執行到這裡可能寫入的時候會出錯,不能正常測試通過
     writer(records)
  }
}
class Writertests extends FlatSpec {
  it should("write success") in{
    val objects=Seq(object1,object2..).toIterator 
    //在new處理類,混入原先特質的一個子特質
    val process=new ProcessHandle(objects) with Writerhandletest 
  }
} 
//可以自定義一個trait繼承自原先的特質,通過將原先的方法覆蓋,然後在重寫後的方法裡面的根據傳入值定義所需要斷言即可
trait Writerhandletest extends WriterHandle{
  override def writer(records:Seq[GenericRecord]):Unit={
     assert(records.length==N)
     assert(records(0).XX=="xxx")
   }
}

如有必要也可以測試下私有方法:

理論上來說,私有方法都會被公有方法呼叫,呼叫公有方法也可以驗證私有方法,不過如果公有方法不方便測試也可以對某個私有方法進行測試,就看是否有必要
可以測試如下:


class  MyTest(s:String){
  //此公有方法可能不方便測試
  def ():Unit={
     ...
     doSth(s)
  }
  //這裡私有方法,可能是邏輯關鍵所在,有必要測試
  private def doSth(s:String):String={
     ...
  }
}

編寫單元測試

//要混入PrivateMethodTester特質
class MytestTests extends FlatSpec with PrivateMethodTester{
  it should("write success") in{

        //首先new一個要測試的類
    val myTest=new MyTest("string") 
       //其中通過PrivateMethod修飾,[]中為返回值, ('method)單引號後跟私有方法名 
    val dosth=PrivateMethod[String]('doSth)
       //通過invokePrivate 委託呼叫私有方法,注意引數要對,貌似傳null的話會找不到對應的方法
    val str=myTest invokePrivate dosth("string")
       //最後斷言相應的至即可
    asset(str=="string") 
  }
}