手寫一個DQC(一)DQC簡介及資料解析
阿新 • • 發佈:2022-05-24
一:DQC核心流程
Define:資料質檢規則(指標)的定義。
你要告警給誰,你要使用什麼方式告警(郵件,即時訊息),你的規則是什麼(空值,波動)等
Measure:資料質檢任務的執行
資料在哪儲存:hive、mysql是基本的資料庫、CK、kylin等
Analyze:資料質檢結果量化及視覺化展示。
分為兩種情況:(一)不需要圖形化介面,直接在排程裡面進行bash配置,使用自定義程式碼解析(二)有圖形化介面,可以操作和檢視歷史結果
二:DQC標準
Accuracy:準確性。如是否符合表的加工邏輯。
Completeness:完備性。如資料是否存在丟失。
Timeliness:及時性。如表資料是否按時產生。
Uniqueness:唯一性。如主鍵欄位是否唯一。
Validity:合規性。如欄位長度是否合規、列舉值集合是否合規。
Consistency:一致性。如表與表之間在某些欄位上是否存在矛盾。
三:DQC規則
2.1有效性
欄位長度有效、欄位內容有效、欄位數值範圍有效、列舉值個數有效、列舉值集合有效
2.2 唯一性
對主鍵是否存在重複資料的監控指標。
2.3 完整性
欄位是否為空或NULL、記錄數是否丟失、記錄數環比波動、錄數波動範圍、記錄數方差檢驗、
2.4 準確性
數值同比、數值環比、數值方差檢驗、表邏輯檢查
2.5 一致性
表級別一致性檢查,外來鍵檢查
2.6 時效性
表級別質量監控指標,資料是否按時產出
2.7資料剖析
最大值檢查、最小值檢查、平均值檢查、彙總值檢查
2.8 自定義規則檢查
使用者寫自定義SQL實現的監控規則
從有效性、唯一性、完整性、準確性、一致性、時效性、資料剖析和自定義規則檢查等幾個維度對資料質量進行測量,但對於現在超級大的資料量級監控所有的資料是不符合成本效率的。
因此,知道哪些資料為最關鍵的,對這些關鍵資料進行全鏈路的資料質量,這樣有助於防止錯誤或揭示改進的機會。
總結:指定值、空值、外來鍵規範、外來鍵最大最小、行數統計、最大、最小、平均、使用者自定義
四:樣例程式碼
String[] argsa={"-alter_user=xiaolong.wu","-alter_type=3","-counttype=A0001","-count_column=tag","-databases=hive.tranadm.adm_fin_paytrigger_revenue_ds","-dt=20220509","-filter_column=easysolar|tag","-filter_column_value=1000|上一節點","-assert_type=eq","-assert_Stringue=200","-assert_rate=0.2"};
MyArgs myArgs = MyArgs.build(argsa);
import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.List; class MyArgsItemInfo{ private String optionName =""; private boolean necessary = false; private String datatype = ""; private String dataverfiy = ""; // String:正則表示式 enum:|分割的列舉型別 float:最大值|最小值 date:日期格式 private String desc =""; public MyArgsItemInfo(String optionName, boolean necessary,String datetype,String dateverfiy, String desc) { this.optionName = optionName; this.necessary = necessary; this.datatype = datetype; this.dataverfiy = dateverfiy; this.desc = desc; } public String getOptionName() { return optionName; } public String getDatatype() { return datatype; } public String getDataverfiy() { return dataverfiy; } } class MyArgsInfo { public static MyArgsInfoBuildFactory init(){ return new MyArgsInfoBuildFactory(); } public static class MyArgsInfoBuildFactory { private List<MyArgsItemInfo> buildFactoryerMyArgsItemInfo = new ArrayList<MyArgsItemInfo>(); public List<MyArgsItemInfo> build(){ return buildFactoryerMyArgsItemInfo; } public MyArgsInfoBuildFactory addMyArgsItemInfo(String optionName, boolean necessary,String datetype,String dateverfiy, String desc) { buildFactoryerMyArgsItemInfo.add(new MyArgsItemInfo(optionName,necessary,datetype,dateverfiy,desc)); return this; } } } public class MyArgs{ private String alter_user = ""; //告警接收人 private String alter_type = ""; //郵件,企業微信 private String counttype = ""; //A0003 private String count_column = ""; // private String databases = ""; // hive.ods.ods_user_active_di 限定資料庫型別、庫名、表明 private String dt = ""; private String filter_column = ""; //,分割 接受多個欄位 private String filter_column_value = ""; //,分割 接受多個欄位 private String sql = ""; private String assert_type = ""; //eq(==), lt(<), gt(>), le(<=), ge(>=), ne(!=) private String assert_Stringue = ""; //, private String assert_rate = ""; //, public static MyArgs build(String[] args) { return new MyArgsBuildFactory().build(args); } public MyArgs(MyArgsBuildFactory builder) { this.alter_user = builder.alter_user; this.alter_type = builder.alter_type ; this.counttype = builder.counttype; this.count_column = builder.count_column; this.databases = builder.databases; this.dt = builder.dt; this.filter_column = builder.filter_column; this.filter_column_value = builder.filter_column_value; this.sql = builder.sql; this.assert_type = builder.assert_type; this.assert_Stringue = builder.assert_Stringue; this.assert_rate = builder.assert_rate; } public static class MyArgsBuildFactory{ private boolean checkFalse = false; private String alter_user = ""; //告警接收人 private String alter_type = ""; //郵件,企業微信 private String counttype = ""; //count(1) count(distinct ) private String count_column = ""; // private String databases = ""; // hive.ods.ods_user_active_di 限定資料庫型別、庫名、表明 private String dt = ""; // 預設比較今天和前一天的結果 private String filter_column = ""; //,分割 接受多個欄位 private String filter_column_value = ""; //,分割 接受多個欄位 private String sql = ""; private String assert_type = ""; //eq(==), lt(<), gt(>), le(<=), ge(>=), ne(!=) private String assert_Stringue = ""; //, private String assert_rate = ""; //, public void checkArgs(List<MyArgsItemInfo> myArgsInfo){ for (int i = 0; i < myArgsInfo.size(); i++) { MyArgsItemInfo myArgsItemInfo = myArgsInfo.get(i); String datatype = myArgsItemInfo.getDatatype(); String datavalue = getValueByName(myArgsItemInfo.getOptionName()); // String:正則表示式 enum:|分割的列舉型別 float:最大值|最小值 date:日期格式 if(!datavalue.equals("")){ //空值的去掉 switch (datatype){ case "String" : { if(!datavalue.matches(myArgsItemInfo.getDataverfiy())){ System.out.println(myArgsItemInfo.getOptionName()+"引數值存在問題,非"+myArgsItemInfo.getDataverfiy()+"正則表示式"); checkFalse = true; } };break; case "enum" : { if(!Arrays.asList(myArgsItemInfo.getDataverfiy().split("\\|")).contains(datavalue)){ System.out.println(myArgsItemInfo.getOptionName()+"引數值存在問題,非"+myArgsItemInfo.getDataverfiy()+"列舉值"); checkFalse = true; } }break; case "float" : { float minvalue = Float.valueOf(myArgsItemInfo.getDataverfiy().split("\\|")[0]); float maxvalue = Float.valueOf(myArgsItemInfo.getDataverfiy().split("\\|")[0]); if(Float.valueOf(datavalue)>maxvalue || Float.valueOf(datavalue)<minvalue){ checkFalse = true; } };break; case "date" : { SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd"); try { format.parse(datavalue); } catch (ParseException e) { System.out.println(myArgsItemInfo.getOptionName()+"引數值存在問題,非"+myArgsItemInfo.getDataverfiy()+"格式"); checkFalse = true; } };break; } } } //檢測 filter_column 長度和 filter_column_value長度相同 String []filter_column_array = filter_column.split("\\|"); String []filter_column_value_array = filter_column_value.split("\\|"); if(filter_column_array.length!=filter_column_value_array.length){ System.out.println("filter_column列長度和filter_column_value長度不一致"); checkFalse = true; } if(checkFalse){ new Exception(); } } public String getValueByName(String getOptionName){ String data = ""; switch (getOptionName){ case "alter_user" : data = this.alter_user;break; case "alter_type" : data = this.alter_type;break; case "counttype" : data = this.counttype;break; case "count_column" : data = this.count_column;break; case "databases" : data = this.databases;break; case "dt" : data = this.dt;break; case "filter_column" : data = this.filter_column;break; case "filter_column_value" : data = this.filter_column_value;break; case "sql" : data = this.sql;break; case "assert_type" : data = this.assert_type;break; case "assert_Stringue" : data = this.assert_Stringue;break; case "assert_rate" : data = this.assert_rate;break; } return data; } public List<MyArgsItemInfo> myArgsItemInfoInit(){ MyArgsInfo.MyArgsInfoBuildFactory myArgsInfoBuildFactory = MyArgsInfo.init(); myArgsInfoBuildFactory.addMyArgsItemInfo("alter_user", true,"String","[a-z]*.[a-z]*", "告警接收人") .addMyArgsItemInfo("alter_type", true,"enum","1|2|3", "1:郵件,2:企業微信,3郵件+企業微信") .addMyArgsItemInfo("counttype", true,"enum","A0001|A0002|A0003|A0004|A0005", "計算型別") .addMyArgsItemInfo("count_column", true,"String","[a-zA-Z0-9_]*", "列名") .addMyArgsItemInfo("databases", true,"String","(hive|mysql|kylin|clickhouse|hbase).[a-z_]*.[a-z_]*", "hive.ods.ods_user_active_di 限定資料庫型別、庫名、表明") .addMyArgsItemInfo("dt", false,"date","yyyyMMdd", "時間") .addMyArgsItemInfo("filter_column", true,"String","[a-zA-Z0-9_\\|]*", "|分割 接受多個欄位") .addMyArgsItemInfo("filter_column_value", true,"String","[a-zA-Z0-9_\\|]*", "|分割 接受多個欄位") .addMyArgsItemInfo("sql", false,"String","[a-zA-Z\\*\\s]*", "自定義SQL") .addMyArgsItemInfo("assert_type", true,"enum","eq|lt|gt|le|ge|ne", "eq(==), lt(<), gt(>), le(<=), ge(>=), ne(!=)") .addMyArgsItemInfo("assert_Stringue", true,"float","-2099999999|2099999999", "限定數值") .addMyArgsItemInfo("assert_rate", true,"float","0|1", "限定比率"); return myArgsInfoBuildFactory.build(); } public void myArgsValueInit(String[] args){ for (int i = 0; i < args.length; i++) { String argString = args[i].substring(1,args[i].length()); String argName = argString.split("=")[0]; String argValue = argString.split("=")[1]; switch (argName){ case "alter_user" : this.alter_user(argValue);break; case "alter_type" : this.alter_type(argValue);break; case "counttype" : this.counttype(argValue);break; case "count_column" : this.count_column(argValue);break; case "databases" : this.databases(argValue);break; case "dt" : this.dt(argValue);break; case "filter_column" : this.filter_column(argValue);break; case "filter_column_value" : this.filter_column_value(argValue);break; case "sql" : this.sql(argValue);break; case "assert_type" : this.assert_type(argValue);break; case "assert_Stringue" : this.assert_Stringue(argValue);break; case "assert_rate" : this.assert_rate(argValue);break; } } } //主要方法 public MyArgs build(String[] args){ List<MyArgsItemInfo> myArgsInfo = myArgsItemInfoInit(); //將列資訊初始化 myArgsValueInit(args); //將引數命令列過來的值初始化 checkArgs(myArgsInfo); //檢查命令列過來的值是否符合 列表達式 return new MyArgs(this); } public MyArgsBuildFactory alter_user(String alter_user) { this.alter_user = alter_user; return this; } public MyArgsBuildFactory alter_type(String alter_type) { this.alter_type = alter_type; return this; } public MyArgsBuildFactory counttype(String counttype) { this.counttype = counttype; return this; } public MyArgsBuildFactory count_column(String count_column) { this.count_column = count_column; return this; } public MyArgsBuildFactory databases(String databases) { this.databases = databases; return this; } public MyArgsBuildFactory dt(String dt) { this.dt = dt; return this; } public MyArgsBuildFactory filter_column(String filter_column) { this.filter_column = filter_column; return this; } public MyArgsBuildFactory filter_column_value(String filter_column_value) { this.filter_column_value = filter_column_value; return this; } public MyArgsBuildFactory sql(String sql) { this.sql = sql; return this; } public MyArgsBuildFactory assert_type(String assert_type) { this.assert_type = assert_type; return this; } public MyArgsBuildFactory assert_Stringue(String assert_Stringue) { this.assert_Stringue = assert_Stringue; return this; } public MyArgsBuildFactory assert_rate(String assert_rate) { this.assert_rate = assert_rate; return this; } } public String getAlter_user() { return alter_user; } public String getAlter_type() { return alter_type; } public String getCounttype() { return counttype; } public String getCount_column() { return count_column; } public String getDatabases() { return databases; } public String getDt() { return dt; } public String getFilter_column() { return filter_column; } public String getFilter_column_value() { return filter_column_value; } public String getSql() { return sql; } public String getAssert_type() { return assert_type; } public String getAssert_Stringue() { return assert_Stringue; } public String getAssert_rate() { return assert_rate; } @Override public String toString() { return "MyArgs{" + "alter_user='" + alter_user + '\'' + ", alter_type=" + alter_type + ", counttype='" + counttype + '\'' + ", count_column='" + count_column + '\'' + ", databases='" + databases + '\'' + ", dt='" + dt + '\'' + ", filter_column='" + filter_column + '\'' + ", filter_column_value='" + filter_column_value + '\'' + ", sql='" + sql + '\'' + ", assert_type='" + assert_type + '\'' + ", assert_Stringue=" + assert_Stringue + ", assert_rate=" + assert_rate + '}'; } }