1. 程式人生 > 其它 >手寫一個DQC(一)DQC簡介及資料解析

手寫一個DQC(一)DQC簡介及資料解析

一:DQC核心流程

    Define:資料質檢規則(指標)的定義。
            你要告警給誰,你要使用什麼方式告警(郵件,即時訊息),你的規則是什麼(空值,波動)等
    Measure:資料質檢任務的執行
            資料在哪儲存:hive、mysql是基本的資料庫、CK、kylin等
    Analyze:資料質檢結果量化及視覺化展示。
            分為兩種情況:(一)不需要圖形化介面,直接在排程裡面進行bash配置,使用自定義程式碼解析(二)有圖形化介面,可以操作和檢視歷史結果

二:DQC標準

    Accuracy:準確性。如是否符合表的加工邏輯。
Completeness:完備性。如資料是否存在丟失。
Timeliness:及時性。如表資料是否按時產生。
Uniqueness:唯一性。如主鍵欄位是否唯一。
Validity:合規性。如欄位長度是否合規、列舉值集合是否合規。
Consistency:一致性。如表與表之間在某些欄位上是否存在矛盾。

三:DQC規則

2.1有效性
欄位長度有效、欄位內容有效、欄位數值範圍有效、列舉值個數有效、列舉值集合有效
2.2 唯一性
對主鍵是否存在重複資料的監控指標。
2.3 完整性
欄位是否為空或NULL、記錄數是否丟失、記錄數環比波動、錄數波動範圍、記錄數方差檢驗、
2.4 準確性
數值同比、數值環比、數值方差檢驗、表邏輯檢查
2.5 一致性
表級別一致性檢查,外來鍵檢查
2.6 時效性
表級別質量監控指標,資料是否按時產出
2.7資料剖析
最大值檢查、最小值檢查、平均值檢查、彙總值檢查
2.8 自定義規則檢查
使用者寫自定義SQL實現的監控規則
從有效性、唯一性、完整性、準確性、一致性、時效性、資料剖析和自定義規則檢查等幾個維度對資料質量進行測量,但對於現在超級大的資料量級監控所有的資料是不符合成本效率的。
因此,知道哪些資料為最關鍵的,對這些關鍵資料進行全鏈路的資料質量,這樣有助於防止錯誤或揭示改進的機會。

總結:指定值、空值、外來鍵規範、外來鍵最大最小、行數統計、最大、最小、平均、使用者自定義

四:樣例程式碼

           

String[] argsa={"-alter_user=xiaolong.wu","-alter_type=3","-counttype=A0001","-count_column=tag","-databases=hive.tranadm.adm_fin_paytrigger_revenue_ds","-dt=20220509","-filter_column=easysolar|tag","-filter_column_value=1000|上一節點","-assert_type=eq","-assert_Stringue=200","-assert_rate=0.2"};

MyArgs myArgs = MyArgs.build(argsa);

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

class MyArgsItemInfo{
    private String optionName  ="";
    private boolean necessary  = false;
    private String datatype  = "";
    private String dataverfiy  = ""; //  String:正則表示式   enum:|分割的列舉型別    float:最大值|最小值   date:日期格式
    private String desc  ="";

    public MyArgsItemInfo(String optionName, boolean necessary,String datetype,String dateverfiy, String desc) {
        this.optionName = optionName;
        this.necessary = necessary;
        this.datatype = datetype;
        this.dataverfiy = dateverfiy;
        this.desc = desc;
    }
    public String getOptionName() {
        return optionName;
    }
    public String getDatatype() {
        return datatype;
    }
    public String getDataverfiy() {
        return dataverfiy;
    }
}

class MyArgsInfo {

    public static MyArgsInfoBuildFactory init(){
        return new MyArgsInfoBuildFactory();
    }
    public static class MyArgsInfoBuildFactory {

        private List<MyArgsItemInfo> buildFactoryerMyArgsItemInfo = new ArrayList<MyArgsItemInfo>();

        public List<MyArgsItemInfo> build(){
            return buildFactoryerMyArgsItemInfo;
        }
        public MyArgsInfoBuildFactory addMyArgsItemInfo(String optionName, boolean necessary,String datetype,String dateverfiy, String desc) {
            buildFactoryerMyArgsItemInfo.add(new MyArgsItemInfo(optionName,necessary,datetype,dateverfiy,desc));
            return this;
        }
    }
}

public class MyArgs{
    private String alter_user = "";              //告警接收人
    private String alter_type = "";              //郵件,企業微信
    private String counttype = "";               //A0003
    private String count_column = "";            //
    private String databases  = "";              // hive.ods.ods_user_active_di  限定資料庫型別、庫名、表明
    private String dt = "";
    private String filter_column = "";           //,分割    接受多個欄位
    private String filter_column_value = "";   //,分割    接受多個欄位
    private String sql = "";
    private String assert_type = "";             //eq(==), lt(<), gt(>), le(<=), ge(>=), ne(!=)
    private String assert_Stringue = "";           //,
    private String assert_rate = "";               //,

    public static MyArgs build(String[] args) {
        return new MyArgsBuildFactory().build(args);
    }

    public MyArgs(MyArgsBuildFactory builder) {
        this.alter_user = builder.alter_user;
        this.alter_type = builder.alter_type ;
        this.counttype = builder.counttype;
        this.count_column = builder.count_column;
        this.databases = builder.databases;
        this.dt = builder.dt;
        this.filter_column = builder.filter_column;
        this.filter_column_value = builder.filter_column_value;
        this.sql = builder.sql;
        this.assert_type = builder.assert_type;
        this.assert_Stringue = builder.assert_Stringue;
        this.assert_rate = builder.assert_rate;
    }

    public static class MyArgsBuildFactory{
        private boolean checkFalse = false;

        private String alter_user = "";              //告警接收人
        private String alter_type = "";              //郵件,企業微信
        private String counttype = "";               //count(1) count(distinct )
        private String count_column = "";            //
        private String databases  = "";              // hive.ods.ods_user_active_di  限定資料庫型別、庫名、表明
        private String dt = "";                      // 預設比較今天和前一天的結果
        private String filter_column = "";           //,分割    接受多個欄位
        private String filter_column_value = "";   //,分割    接受多個欄位
        private String sql = "";
        private String assert_type = "";             //eq(==), lt(<), gt(>), le(<=), ge(>=), ne(!=)
        private String assert_Stringue = "";           //,
        private String assert_rate = "";               //,

        public void checkArgs(List<MyArgsItemInfo> myArgsInfo){
            for (int i = 0; i < myArgsInfo.size(); i++) {
                MyArgsItemInfo myArgsItemInfo = myArgsInfo.get(i);
                String datatype = myArgsItemInfo.getDatatype();
                String datavalue = getValueByName(myArgsItemInfo.getOptionName());
                //  String:正則表示式   enum:|分割的列舉型別    float:最大值|最小值   date:日期格式
                if(!datavalue.equals("")){   //空值的去掉
                    switch (datatype){
                        case "String" : {
                            if(!datavalue.matches(myArgsItemInfo.getDataverfiy())){
                                System.out.println(myArgsItemInfo.getOptionName()+"引數值存在問題,非"+myArgsItemInfo.getDataverfiy()+"正則表示式");
                                checkFalse = true;
                            }
                        };break;
                        case "enum" : {
                            if(!Arrays.asList(myArgsItemInfo.getDataverfiy().split("\\|")).contains(datavalue)){
                                System.out.println(myArgsItemInfo.getOptionName()+"引數值存在問題,非"+myArgsItemInfo.getDataverfiy()+"列舉值");
                                checkFalse = true;
                            }
                        }break;
                        case "float" : {
                            float minvalue = Float.valueOf(myArgsItemInfo.getDataverfiy().split("\\|")[0]);
                            float maxvalue = Float.valueOf(myArgsItemInfo.getDataverfiy().split("\\|")[0]);
                            if(Float.valueOf(datavalue)>maxvalue || Float.valueOf(datavalue)<minvalue){
                                checkFalse = true;
                            }
                        };break;
                        case "date" : {
                            SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
                            try {
                                format.parse(datavalue);
                            } catch (ParseException e) {
                                System.out.println(myArgsItemInfo.getOptionName()+"引數值存在問題,非"+myArgsItemInfo.getDataverfiy()+"格式");
                                checkFalse = true;
                            }
                        };break;
                    }

                }
            }
            //檢測 filter_column 長度和 filter_column_value長度相同
            String []filter_column_array = filter_column.split("\\|");
            String []filter_column_value_array = filter_column_value.split("\\|");

            if(filter_column_array.length!=filter_column_value_array.length){
                System.out.println("filter_column列長度和filter_column_value長度不一致");
                checkFalse = true;
            }

            if(checkFalse){
                new Exception();
            }
        }
        public String getValueByName(String getOptionName){
            String data = "";
            switch (getOptionName){
                case "alter_user" : data = this.alter_user;break;
                case "alter_type" : data = this.alter_type;break;
                case "counttype" : data = this.counttype;break;
                case "count_column" : data = this.count_column;break;
                case "databases" : data = this.databases;break;
                case "dt" : data = this.dt;break;
                case "filter_column" : data = this.filter_column;break;
                case "filter_column_value" : data = this.filter_column_value;break;
                case "sql" : data = this.sql;break;
                case "assert_type" : data = this.assert_type;break;
                case "assert_Stringue" : data = this.assert_Stringue;break;
                case "assert_rate" : data = this.assert_rate;break;
            }
            return data;
        }
        public List<MyArgsItemInfo> myArgsItemInfoInit(){
            MyArgsInfo.MyArgsInfoBuildFactory myArgsInfoBuildFactory = MyArgsInfo.init();
            myArgsInfoBuildFactory.addMyArgsItemInfo("alter_user", true,"String","[a-z]*.[a-z]*", "告警接收人")
                    .addMyArgsItemInfo("alter_type", true,"enum","1|2|3", "1:郵件,2:企業微信,3郵件+企業微信")
                    .addMyArgsItemInfo("counttype", true,"enum","A0001|A0002|A0003|A0004|A0005", "計算型別")
                    .addMyArgsItemInfo("count_column", true,"String","[a-zA-Z0-9_]*", "列名")
                    .addMyArgsItemInfo("databases", true,"String","(hive|mysql|kylin|clickhouse|hbase).[a-z_]*.[a-z_]*", "hive.ods.ods_user_active_di  限定資料庫型別、庫名、表明")
                    .addMyArgsItemInfo("dt", false,"date","yyyyMMdd", "時間")
                    .addMyArgsItemInfo("filter_column", true,"String","[a-zA-Z0-9_\\|]*", "|分割    接受多個欄位")
                    .addMyArgsItemInfo("filter_column_value", true,"String","[a-zA-Z0-9_\\|]*", "|分割    接受多個欄位")
                    .addMyArgsItemInfo("sql", false,"String","[a-zA-Z\\*\\s]*", "自定義SQL")
                    .addMyArgsItemInfo("assert_type", true,"enum","eq|lt|gt|le|ge|ne", "eq(==), lt(<), gt(>), le(<=), ge(>=), ne(!=)")
                    .addMyArgsItemInfo("assert_Stringue", true,"float","-2099999999|2099999999", "限定數值")
                    .addMyArgsItemInfo("assert_rate", true,"float","0|1", "限定比率");
            return myArgsInfoBuildFactory.build();
        }
        public void myArgsValueInit(String[] args){
            for (int i = 0; i < args.length; i++) {
                String argString = args[i].substring(1,args[i].length());
                String argName = argString.split("=")[0];
                String argValue = argString.split("=")[1];

                switch (argName){
                    case "alter_user" : this.alter_user(argValue);break;
                    case "alter_type" : this.alter_type(argValue);break;
                    case "counttype" : this.counttype(argValue);break;
                    case "count_column" : this.count_column(argValue);break;
                    case "databases" : this.databases(argValue);break;
                    case "dt" : this.dt(argValue);break;
                    case "filter_column" : this.filter_column(argValue);break;
                    case "filter_column_value" : this.filter_column_value(argValue);break;
                    case "sql" : this.sql(argValue);break;
                    case "assert_type" : this.assert_type(argValue);break;
                    case "assert_Stringue" : this.assert_Stringue(argValue);break;
                    case "assert_rate" : this.assert_rate(argValue);break;
                }
            }
        }
        //主要方法
        public MyArgs build(String[] args){
            List<MyArgsItemInfo> myArgsInfo = myArgsItemInfoInit();  //將列資訊初始化
            myArgsValueInit(args);                                   //將引數命令列過來的值初始化
            checkArgs(myArgsInfo);                                   //檢查命令列過來的值是否符合  列表達式
            return new MyArgs(this);
        }


        public MyArgsBuildFactory alter_user(String alter_user) {
            this.alter_user = alter_user;
            return this;
        }
        public MyArgsBuildFactory alter_type(String alter_type) {
            this.alter_type = alter_type;
            return this;
        }
        public MyArgsBuildFactory counttype(String counttype) {
            this.counttype = counttype;
            return this;
        }
        public MyArgsBuildFactory count_column(String count_column) {
            this.count_column = count_column;
            return this;
        }
        public MyArgsBuildFactory databases(String databases) {
            this.databases = databases;
            return this;
        }
        public MyArgsBuildFactory dt(String dt) {
            this.dt = dt;
            return this;
        }
        public MyArgsBuildFactory filter_column(String filter_column) {
            this.filter_column = filter_column;
            return this;
        }
        public MyArgsBuildFactory filter_column_value(String filter_column_value) {
            this.filter_column_value = filter_column_value;
            return this;
        }
        public MyArgsBuildFactory sql(String sql) {
            this.sql = sql;
            return this;
        }
        public MyArgsBuildFactory assert_type(String assert_type) {
            this.assert_type = assert_type;
            return this;
        }
        public MyArgsBuildFactory assert_Stringue(String assert_Stringue) {
            this.assert_Stringue = assert_Stringue;
            return this;
        }
        public MyArgsBuildFactory assert_rate(String assert_rate) {
            this.assert_rate = assert_rate;
            return this;
        }
    }
    public String getAlter_user() {
        return alter_user;
    }
    public String getAlter_type() {
        return alter_type;
    }
    public String getCounttype() {
        return counttype;
    }
    public String getCount_column() {
        return count_column;
    }
    public String getDatabases() {
        return databases;
    }
    public String getDt() {
        return dt;
    }
    public String getFilter_column() {
        return filter_column;
    }
    public String getFilter_column_value() {
        return filter_column_value;
    }
    public String getSql() {
        return sql;
    }
    public String getAssert_type() {
        return assert_type;
    }
    public String getAssert_Stringue() {
        return assert_Stringue;
    }
    public String getAssert_rate() {
        return assert_rate;
    }
    @Override
    public String toString() {
        return "MyArgs{" +
                "alter_user='" + alter_user + '\'' +
                ", alter_type=" + alter_type +
                ", counttype='" + counttype + '\'' +
                ", count_column='" + count_column + '\'' +
                ", databases='" + databases + '\'' +
                ", dt='" + dt + '\'' +
                ", filter_column='" + filter_column + '\'' +
                ", filter_column_value='" + filter_column_value + '\'' +
                ", sql='" + sql + '\'' +
                ", assert_type='" + assert_type + '\'' +
                ", assert_Stringue=" + assert_Stringue +
                ", assert_rate=" + assert_rate +
                '}';
    }
}