1. 程式人生 > 其它 >華為雲MRS基於Hudi和HetuEngine構建實時資料湖最佳實踐

華為雲MRS基於Hudi和HetuEngine構建實時資料湖最佳實踐

資料湖與實時資料湖是什麼?

各個行業企業都在構建企業級資料湖,將企業內多種格式資料來源匯聚的大資料平臺,通過嚴格的資料許可權和資源管控,將資料和算力開放給各種使用者。一份資料支援多種分析,是資料湖最大的特點。如果資料湖的資料,從資料來源產生後,可以在1分鐘以內實時進入到資料湖儲存,支援各種互動式分析,這種資料湖通常叫做實時資料湖,如果可以做到15分鐘之內,也可稱為準實時資料湖。構建實時資料湖,正在成為5G和IOT時代,支撐各個企業實時分析業務的資料湖新目標。

華為MRS實時資料湖方案介紹

  1. 生產庫資料通過CDC工具(debezium)實時錄入到MRS叢集中Kafka的指定topic裡;
  2. 在MRS叢集啟動一個SparkStreaming任務,實時讀取Kafka指定topic裡的資料;
  3. 同時該SparkStreaming任務將讀取到的資料進行解析處理並寫入到一張hudi表中;
  4. 寫入hudi表的同時可以指定該資料也寫入hive表;
  5. 通過MRS提供的互動式查詢引擎HetuEngine對資料進行快速的互動式查詢。

使用華為MRS實時資料湖方案的優勢:

  1. ACID事務能力得以保證,湖內一份資料滿足所有的分析業務需求,減少資料搬遷,減少資料冗餘;
  2. 資料一致性保證,保證增量資料與入湖後資料一致性檢測;
  3. 資料加工流轉,在一個儲存層內閉環,資料流動更高效;
  4. 基於HetuEngine引擎實現互動式查詢,效能不降低。

下面會針對方案的三個關鍵元件:CDC工具,資料儲存引擎Hudi,互動式查詢引擎HetuEngine進行詳細的介紹

樣例資料簡介

生產庫MySQL原始資料(前10條,共1000條):

CDC工具

簡介

CDC(changed data capture)為動態資料抓取,常見的方式分為同步和非同步。同步CDC主要是採用觸發器記錄新增資料,基本能夠做到實時增量抽取。而非同步CDC則是通過分析已經commit的日誌記錄來得到增量資料資訊。常見的CDC工具有Canal, DataBus, Maxwell, Debezium, OGG等。本方案採用debezium作為CDC工具

對接步驟

具體參考:https://fusioninsight.github.io/ecosystem/zh-hans/Data_Integration/DEBEZIUM/

完成對接後,針對MySQL生產庫分別做增、改、刪除操作對應的kafka訊息

增加操作: insert into hudi.hudisource values (1001,“蔣語堂”,38,“女”,“圖”,"《星球大戰》",28732);

對應kafka訊息體:

更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie’ WHERE uid=1001;

對應kafka訊息體:

刪除操作: delete from hudi.hudisource where uid=1001;

對應kafka訊息體:

Hudi

簡介

Apache Hudi是一個Data Lakes的開源方案,Hudi是Hadoop Updates and Incrementals的簡寫。具有以下的特性

  • ACID事務能力,支援實時入湖和批量入湖。
  • 多種檢視能力(讀優化檢視/增量檢視/實時檢視),支援快速資料分析。
  • MVCC設計,支援資料版本回溯。
  • 自動管理檔案大小和佈局,以優化查詢效能準實時攝取,為查詢提供最新資料。
  • 支援併發讀寫,基於snapshot的隔離機制實現寫入時可讀取。
  • 支援原地轉表,將存量的歷史錶轉換為Hudi資料集。

樣例程式碼解析

使用Hudi實時入湖的樣例程式碼分三個部分

  • Kafka資料消費
  • 資料內容解析、處理
  • 解析後資料的寫入

Kafka資料消費部分樣例程式碼:

String savePath = "hdfs://hacluster/huditest2/";
String groupId = "group1";
System.out.println("groupID is: " + groupId);
String brokerList = "172.16.5.51:21005";
System.out.println("brokerList is: " + brokerList);
String topic = "hudisource";
System.out.println("topic is: " + topic);
String interval = "5";

HashMap<String, Object> kafkaParam = new HashMap<>();
kafkaParam.put("bootstrap.servers", brokerList);
kafkaParam.put("group.id", groupId);
kafkaParam.put("auto.offset.reset", "earliest");
kafkaParam.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParam.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

HashSet<String> topics = new HashSet<>();
topics.add(topic);

String[] topicArray = {topic};
Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArray));
ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParam);

//本地除錯
SparkConf conf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("hudi-java-demo");

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.streaming.kafka.maxRatePerPartition", "10");
conf.set("spark.streaming.backpressure.enabled", "true");

JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(jssc,
        LocationStrategies.PreferConsistent(),
        consumerStrategy);

資料內容解析、處理部分樣例程式碼:

JavaDStream<List> lines =
        directStream.filter(
                //過濾空行和髒資料
                new Function<ConsumerRecord<String, String>, Boolean>() {
                    public Boolean call(ConsumerRecord<String, String> v1) throws Exception {                     
                        if (v1.value() == null) {
                            return false;
                        }
                        try{
                            String op = debeziumJsonParser.getOP(v1.value());
                        }catch (Exception e){
                            return false;
                        }
                        return true;
                    }
                }
        ).map(
                new Function<ConsumerRecord<String, String>, List>() {
                    public List call(ConsumerRecord<String, String> v1) throws Exception {
                        //將debezium接進來的資料解析寫進List                       
                        String op = debeziumJsonParser.getOP(v1.value());
                        JSONObject json_obj = JSON.parseObject(v1.value());                     
                        Boolean is_delete = false;
                        String out_str = "";
                        if(op.equals("c")){                            
                            out_str =  json_obj.getJSONObject("payload").get("after").toString();
                        }
                        else if(op.equals("u")){                            
                            out_str =   json_obj.getJSONObject("payload").get("after").toString();
                        }
                        else {
                            is_delete = true;
                            out_str =   json_obj.getJSONObject("payload").get("before").toString();
                        }
                        LinkedHashMap<String, String> jsonMap = JSON.parseObject(out_str, new TypeReference<LinkedHashMap<String, String>>() {
                        });
                        int cnt =0;
                        List out_list = new ArrayList();
                        for (Map.Entry<String, String> entry : jsonMap.entrySet()) {
                            out_list.add(entry.getValue());
                            cnt++;
                        }
                        out_list.add(is_delete);
                        String commitTime = Long.toString(System.currentTimeMillis());

                        out_list.add(commitTime);
                        System.out.println(out_list);

                        out_list.add(op);

                        return out_list;
                    }
                });

debezium更新欄位解析樣例程式碼:

public class debeziumJsonParser {
    public static String getOP(String message){
        JSONObject json_obj = JSON.parseObject(message);
        String op = json_obj.getJSONObject("payload").get("op").toString();
        return  op;
    }
}

解析後資料的寫入hudi表,hive表樣例程式碼:

lines.foreachRDD(new VoidFunction<JavaRDD<List>>() {
    @Override
    public void call(JavaRDD<List> stringJavaRDD) throws Exception {
        if (!stringJavaRDD.isEmpty()) {
            System.out.println("stringJavaRDD collect---"+stringJavaRDD.collect());
            List<Row> rowList =new ArrayList<>();
            //把資料上一步資料寫進stringJavaRdd
            for(List row: stringJavaRDD.collect()){

                String uid = row.get(0).toString();
                String name = row.get(1).toString();
                String age = row.get(2).toString();
                String sex = row.get(3).toString();
                String mostlike = row.get(4).toString();
                String lastview = row.get(5).toString();
                String totalcost = row.get(6).toString();
                Boolean _hoodie_is_deleted = Boolean.valueOf(row.get(7).toString());      
                String commitTime = row.get(8).toString();
                String op = row.get(9).toString();
                Row returnRow = RowFactory.create(uid, name, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, commitTime, op);
                rowList.add(returnRow);

            }
            JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);
            //寫入表的欄位schema設計
            List<StructField> fields = new ArrayList<>();
            fields.add(DataTypes.createStructField("uid", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));
            fields.add(DataTypes.createStructField("commitTime", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));
            StructType schema = DataTypes.createStructType(fields);
            Dataset<Row> dataFrame = sqlContext.createDataFrame(stringJavaRdd, schema);
            Dataset<Row> rowDataset = dataFrame.withColumn("ts", dataFrame.col("commitTime"))
                    .withColumn("uuid", dataFrame.col("uid"));        

            //將資料寫入hudi表以及hive表
            rowDataset.write().format("org.apache.hudi")
                    .option("PRECOMBINE_FIELD_OPT_KEY", "ts")
                    .option("RECORDKEY_FIELD_OPT_KEY", "uuid")
                    .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator")                  
                    .option("hoodie.table.name", "huditesttable")
                    .option("hoodie.upsert.shuffle.parallelism", "10")                  
                    .option("hoodie.delete.shuffle.parallelism", "10")
                    .option("hoodie.insert.shuffle.parallelism", "10")
                    .option("hoodie.bulkinsert.shuffle.parallelism", "10")
                    .option("hoodie.finalize.write.parallelism", "10")
                    .option("hoodie.cleaner.parallelism", "10")
                    .option("hoodie.datasource.write.operation", "upsert")                  
                    .option("hoodie.datasource.hive_sync.enable", "true")                    
                    .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor")
                    .option("hoodie.datasource.hive_sync.database", "default")
                    .option("hoodie.datasource.hive_sync.table", "hudidebezium")
                    .option("hoodie.datasource.hive_sync.use_jdbc", "false")
                    .mode(SaveMode.Append)
                    .save(savePath);
        }
    }
});

jssc.start();
jssc.awaitTermination();
jssc.close();

Hudi任務提交命令

source /opt/client/bigdata_env
source /opt/client/Hudi/component_env
spark-submit --master yarn --deploy-mode client --jars /opt/hudi-demo4/fastjson-1.2.4.jar --class hudiIn /opt/hudi-demo4/HudiJavaDemo-1.0-SNAPSHOT.jar

HetuEngine

簡介

HetuEngine是華為FusionInsight MRS提供的高效能分散式SQL查詢、資料虛擬化引擎。能與大資料生態無縫融合,實現海量資料秒級查詢;支援多源異構協同,提供資料湖內一站式SQL融合分析。

同時HetuEngine擁有開放的介面,能夠支援各報表、分析軟體對接,具體可參考生態地圖:https://fusioninsight.github.io/ecosystem/zh-hans/

下面我們以帆軟FineBI為例進行查詢、分析。

配置FineBI對接HetuEngine

JDBC URL: jdbc:presto://172.16.5.51:29860,172.16.5.52:29860/hive/default?serviceDiscoveryMode=hsbroker&tenant=default

檢視初始同步資料:

通過HetuEngine檢查增、改、刪除操作

Mysql增加操作對應hive表結果:

Mysql更改操作對應hive表結果:

Mysql刪除操作對應hive表結果:

報表:

電影喜愛度分析:

電影標籤喜愛度分析: