java解析Parquet檔案
阿新 • • 發佈:2019-01-29
獲取 Parquet檔案,解析為LIst<String>
package com.emcc.hiacloud.analytics.common.util; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class ParquetUtils { private static final String csvDelimiter = ","; public static Map<String,List<String[]>> viewParquet(String path,int maxLine) throws IOException { Map<String,List<String[]>> parquetInfo=new HashMap<>(); List<String[]> dataList=new ArrayList<>(); Schema.Field[] fields = null; String[] fieldNames = new String[0]; try ( ParquetReader<GenericData.Record> reader = AvroParquetReader.<GenericData.Record>builder(new Path(path)).build() ){ int x=0; GenericData.Record record; //解析Parquet資料逐行讀取 while ((record = reader.read()) != null && x<maxLine) { //讀取第一行獲取列頭資訊 if (fields == null) { final List<Schema.Field> fieldsList = record.getSchema().getFields(); fieldNames = getFieldNames(fields = fieldsList.toArray(new Schema.Field[0])); System.out.println("列頭:"+String.join(csvDelimiter, fieldNames)); dataList.add(fieldNames); parquetInfo.put("head",dataList); dataList=new ArrayList<>(); } int i = 0; String[]dataString=new String[fieldNames.length]; //讀取資料獲取列頭資訊 for (final String fieldName : fieldNames) { String recordData=record.get(fieldName).toString(); if(recordData.contains("type")){ List<HashMap> dataFormValue=JSONArray.parseArray(JSONObject.parseObject(recordData).get("values").toString(),HashMap.class); StringBuilder datas = new StringBuilder(); for(HashMap data:dataFormValue){ datas.append(data.get("element").toString()).append(","); } datas.deleteCharAt(datas.length() - 1); recordData=datas.toString(); } dataString[i++] =recordData; } dataList.add(dataString); ++x; } } parquetInfo.put("data",dataList); return parquetInfo; } private static String[] getFieldNames(final Schema.Field[] fields) { final String[] fieldNames = new String[fields.length]; int i = 0; for (final Schema.Field field : fields) { fieldNames[i++] = field.name(); } return fieldNames; } }