列式儲存格式之parquet讀寫
阿新 • • 發佈:2021-01-02
title: 列式儲存格式之parquet
date: 2021-01-01 11:45:36
tags: haddop
概述
Apache Parquet是Hadoop生態系統中任何專案均可使用的列式儲存格式,更高壓縮比以及更小IO操作。網上許多寫入parquet需要在本地安裝haddop環境,下面介紹一種不需要安裝haddop即可寫入parquet檔案的方式,以及通過兩種方式來讀取parquet檔案。下面開始入坑了…
parquet寫入
1.pom依賴
<dependency>
<groupId>org.apache.avro</ groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency >
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro -->
<dependency>
< groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.8.1</version>
</dependency>
2.定義schema(實體類)
package com.kestrel;
public class User {
private String id;
private String name;
private String password;
public User() {
}
public User(String id, String name, String password) {
this.id = id;
this.name = name;
this.password = password;
}
public String getId() {
return id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", password='" + password + '\'' +
'}';
}
}
-
AvroParquetWriter 寫入
List<User> users = new ArrayList<>(); User user1 = new User("1","huangchixin","123123"); User user2 = new User("2","huangchixin2","123445"); users.add(user1); users.add(user2); Path dataFile = new Path("./tmp/demo.snappy.parquet"); // Write as Parquet file. try (ParquetWriter<User> writer = AvroParquetWriter.<User>builder(dataFile) .withSchema(ReflectData.AllowNull.get().getSchema(User.class)) .withDataModel(ReflectData.get()) .withConf(new Configuration()) .withCompressionCodec(SNAPPY) .withWriteMode(OVERWRITE) .build()) { for (User user : users) { writer.write(user); } }
parquet讀取
- AvroParquetReader讀取,需要指定物件例項,或者也可自定義json 字串
// Read from Parquet file. try (ParquetReader<User> reader = AvroParquetReader.<User>builder(dataFile) .withDataModel(new ReflectData(User.class.getClassLoader())) .disableCompatibility() .withConf(new Configuration()) .build()) { User user; while ((user = reader.read()) != null) { System.out.println(user); } }
-
ParquetFileReader讀取,不需要
- 列實體
package com.kestrel; /** * @Auther: 12640 * @Date: 2021/1/1 15:13 * @Description: */ public class TableHead { /** * 列名 */ private String name; /** * 儲存 列的 資料型別 */ private String type; /** * 所在列 */ private Integer index; public String getType() { return type; } public void setType(String type) { this.type = type; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getIndex() { return index; } public void setIndex(Integer index) { this.index = index; } }
- parquet 實體類
package com.kestrel; import java.util.List; /** * @Auther: 12640 * @Date: 2021/1/1 15:14 * @Description: */ public class TableResult { /** * 解析檔案的表頭資訊 暫時只對 arrow,csv 檔案有效 */ private List< TableHead> columns; /** * 資料內容 */ private List<?> data; public List< TableHead> getColumns() { return columns; } public void setColumns(List< TableHead> columns) { this.columns = columns; } public List<?> getData() { return data; } public void setData(List<?> data) { this.data = data; } }
- 讀取parquet檔案
import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.io.RecordReader; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.Type; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class ReadParquet { public static void main(String[] args) throws Exception { TableResult tableResult = parquetReaderV2(new File("./tmp/demo.snappy.parquet")); ObjectMapper mapper = new ObjectMapper(); String jsonString = mapper.writerWithDefaultPrettyPrinter() .writeValueAsString(tableResult); System.out.println(jsonString); } public static TableResult parquetReaderV2(File file) throws Exception { long start = System.currentTimeMillis(); haddopEnv(); Path path = new Path(file.getAbsolutePath()); Configuration conf = new Configuration(); TableResult table = new TableResult(); //二位資料列表 List<List<Object>> dataList = Lists.newArrayList(); ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER); MessageType schema = readFooter.getFileMetaData().getSchema(); ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, readFooter.getBlocks(), schema.getColumns()); // 1.9.0使用以下建立物件 // ParquetFileReader r = new ParquetFileReader(conf, path, readFooter); PageReadStore pages = null; try { while (null != (pages = r.readNextRowGroup())) { final long rows = pages.getRowCount(); // logger.info(file.getName()+" 行數: " + rows); final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); final RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); for (int i = 0; i <= rows; i++) { // System.out.println(recordReader.shouldSkipCurrentRecord()); final Group g = recordReader.read(); if (i == 0) { // 設定表頭列名 table.setColumns(parquetColumn(g)); i++; } // 獲取行資料 List<Object> row = getparquetData(table.getColumns(), g); dataList.add(row); // printGroup(g); } } } finally { r.close(); } // logger.info(file.getName()+" 載入時間:"+(System.currentTimeMillis() - start)); table.setData(dataList); return table; } //新版本中new ParquetReader()所有構造方法好像都棄用了,用上面的builder去構造物件 static void parquetReader(String inPath) throws Exception{ GroupReadSupport readSupport = new GroupReadSupport(); ParquetReader<Group> reader = new ParquetReader<Group>(new Path(inPath),readSupport); Group line=null; while((line=reader.read())!=null){ System.out.println(line.toString()); } System.out.println("讀取結束"); } private static List<Object> getparquetData(List<TableHead> columns, Group line) { List<Object> row = new ArrayList<>(); Object cellStr = null; for (int i = 0; i < columns.size(); i++) { try { switch (columns.get(i).getType()) { case "DOUBLE": cellStr = line.getDouble(i, 0); break; case "FLOAT": cellStr = line.getFloat(i, 0); break; case "BOOLEAN": cellStr = line.getBoolean(i, 0); break; case "INT96": cellStr = line.getInt96(i, 0); break; case "LONG": cellStr = line.getLong(i, 0); break; default: cellStr = line.getValueToString(i, 0); } } catch (RuntimeException e) { } finally { row.add(cellStr); } } return row; } /** * 獲取arrow 檔案 表頭資訊 * * @param * @return */ private static List<TableHead> parquetColumn(Group line) { List<TableHead> columns = Lists.newArrayList(); TableHead dto = null; GroupType groupType = line.getType(); int fieldCount = groupType.getFieldCount(); for (int i = 0; i < fieldCount; i++) { dto = new TableHead(); Type type = groupType.getType(i); String fieldName = type.getName(); OriginalType originalType = type.getOriginalType(); String typeName = null; if (originalType != null) { typeName = originalType.name(); } else { typeName = type.asPrimitiveType().getPrimitiveTypeName().name(); } dto.setIndex(i); dto.setName(fieldName); dto.setType(typeName); columns.add(dto); } return columns; } public static void haddopEnv() throws IOException { File workaround = new File("."); System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath()); new File("./bin").mkdirs(); new File("./bin/winutils.exe").createNewFile(); } }
微信公眾號【Java搬磚小夥子】關注一波,更多資源等著你哦
您的支援是我前進路上最大的動力,謝謝!