1. 程式人生 > 其它 >資料質量 — 使用amazon deequ作為spark etl資料質量檢測

資料質量 — 使用amazon deequ作為spark etl資料質量檢測

目前,公司裡資料質量檢測是通過配置規則報警來實現的,對於有些表需要用shell指令碼來封裝hivesql來進行檢測,在時效性和準確上不能很好的滿足,故嘗試使用Deequ來做質量檢測工具。

一、官網示例

package org.shydow.deequ

import com.amazon.deequ.checks.CheckStatus
import com.amazon.deequ.constraints.ConstraintStatus
import com.amazon.deequ.{VerificationResult, VerificationSuite}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author shydow * @date 2022-03-25 */ object DQService { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("DQC") .master("local[*]") .getOrCreate() val sc: SparkContext
= spark.sparkContext sc.setLogLevel("WARN") import spark.implicits._ val source: RDD[Item] = sc.parallelize(Seq( Item(1, "Thingy A", "awesome thing.", "high", 0), Item(2, "Thingy B", "available at http://thingb.com", null, 0), Item(3, null, null, "low", 5), Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10), Item(
5, "Thingy E", null, "high", 12))) val sourceDF: DataFrame = spark.createDataFrame(source) sourceDF.printSchema() // 質量檢測 val result: VerificationResult = DeequCheckRules.createRule(sourceDF) if (result.status == CheckStatus.Success) { println("The data passed the test, everything is fine!") } else { println("We found errors in the data:\n") val resultsForAllConstraints = result.checkResults .flatMap { case (_, checkResult) => checkResult.constraintResults } resultsForAllConstraints .filter { _.status != ConstraintStatus.Success } .foreach { result => println(s"${result.constraint}: ${result.message.get}") } } spark.close() } }
package org.shydow.deequ

import com.amazon.deequ.{VerificationResult, VerificationSuite}
import com.amazon.deequ.checks.{Check, CheckLevel}
import org.apache.spark.sql.DataFrame

/**
 * @author shydow
 * @date 2022-03-25
 */

object DeequCheckRules {
  // 自定義規則1
  def createRule(df: DataFrame): VerificationResult = {
    VerificationSuite().onData(df)
      .addCheck(Check(CheckLevel.Error, "this a unit test")
        .hasSize(_ == 5) // 判斷資料量是否是5條
        .isComplete("id") // 判斷該列是否全部不為空
        .isUnique("id") // 判斷該欄位是否是唯一
        .isComplete("productName") // 判斷該欄位全部不為空
        .isContainedIn("priority", Array("high", "low")) // 該欄位僅僅包含這兩個欄位
        .isNonNegative("numViews") //該欄位不包含負數
        .containsURL("description", _ >= 0.5) // 包含url的記錄是否超過0.5
        .hasApproxQuantile("numViews", 0.5, _ <= 10)
      )
      .run()
  }
}

 

二、生產中配置的一些規則

def odsTableRule(df: DataFrame) = {
    VerificationSuite()
      .onData(df)
      .addCheck(
        Check(CheckLevel.Error, "base checks")
          .isComplete("primaryKey") // primaryKey即主要欄位不能為空
          .isUnique("uniqueKey") // unique即唯一主鍵
          .isContainedIn("priority", Array("high", "low")) // 判斷該欄位是否只存在列舉型別
          .isNonNegative("numViews") // 斷言該欄位非負數
          .satisfies(
            "abs(column1 - column2) <= 0.20 * column2",
            "value(column1) lies between value(column2)-20% and value(column2)+20%"
          )  // 自定義條件,判斷col1-col2絕對值在0.2 * col2間
      )
      .addCheck(
        Check(CheckLevel.Warning, "distribution checks")
          .containsURL("description", _ >= 0.5)  // 斷言有一半的值包含url
          .hasApproxQuantile("numViews", 0.5, _ <= 10))  // 斷言有一半的值不超過10
      .run()
  }