【Java】列式存储格式之parquet读写

概述

Apache Parquet是Hadoop生态系统中任何项目均可使用的列式存储格式,更高压缩比以及更小IO操作。网上许多写入parquet需要在本地安装haddop环境,下面介绍一种不需要安装haddop即可写入parquet文件的方式,以及通过两种方式来读取parquet文件。下面开始入坑了...

【Java】列式存储格式之parquet读写

parquet写入

1.pom依赖

 <dependency>

<groupId>org.apache.avro</groupId>

<artifactId>avro</artifactId>

<version>1.8.2</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-core</artifactId>

<version>1.2.1</version>

</dependency>

<dependency>

<groupId>org.apache.parquet</groupId>

<artifactId>parquet-hadoop</artifactId>

<version>1.8.1</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro -->

<dependency>

<groupId>org.apache.parquet</groupId>

<artifactId>parquet-avro</artifactId>

<version>1.8.1</version>

</dependency>

2.定义schema(实体类)

package com.kestrel;

public class User {

private String id;

private String name;

private String password;

public User() {

}

public User(String id, String name, String password) {

this.id = id;

this.name = name;

this.password = password;

}

public String getId() {

return id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

@Override

public String toString() {

return "User{" +

"id='" + id + '\'' +

", name='" + name + '\'' +

", password='" + password + '\'' +

'}';

}

}

  1. AvroParquetWriter 写入

    List<User> users = new ArrayList<>();

    User user1 = new User("1","huangchixin","123123");

    User user2 = new User("2","huangchixin2","123445");

    users.add(user1);

    users.add(user2);

    Path dataFile = new Path("./tmp/demo.snappy.parquet");

    // Write as Parquet file.

    try (ParquetWriter<User> writer = AvroParquetWriter.<User>builder(dataFile)

    .withSchema(ReflectData.AllowNull.get().getSchema(User.class))

    .withDataModel(ReflectData.get())

    .withConf(new Configuration())

    .withCompressionCodec(SNAPPY)

    .withWriteMode(OVERWRITE)

    .build()) {

    for (User user : users) {

    writer.write(user);

    }

    }

    parquet读取

    1. AvroParquetReader读取,需要指定对象实例,或者也可自定义json 字符串

    // Read from Parquet file.

    try (ParquetReader<User> reader = AvroParquetReader.<User>builder(dataFile)

    .withDataModel(new ReflectData(User.class.getClassLoader()))

    .disableCompatibility()

    .withConf(new Configuration())

    .build()) {

    User user;

    while ((user = reader.read()) != null) {

    System.out.println(user);

    }

    }

    1. ParquetFileReader读取,不需要

      • 列实体

      package com.kestrel;

      /**

      * @Auther: 12640

      * @Date: 2021/1/1 15:13

      * @Description:

      */

      public class TableHead {

      /**

      * 列名

      */

      private String name;

      /**

      * 存储 列的 数据类型

      */

      private String type;

      /**

      * 所在列

      */

      private Integer index;

      public String getType() {

      return type;

      }

      public void setType(String type) {

      this.type = type;

      }

      public String getName() {

      return name;

      }

      public void setName(String name) {

      this.name = name;

      }

      public Integer getIndex() {

      return index;

      }

      public void setIndex(Integer index) {

      this.index = index;

      }

      }

      • parquet 实体类

      package com.kestrel;

      import java.util.List;

      /**

      * @Auther: 12640

      * @Date: 2021/1/1 15:14

      * @Description:

      */

      public class TableResult {

      /**

      * 解析文件的表头信息 暂时只对 arrow,csv 文件有效

      */

      private List< TableHead> columns;

      /**

      * 数据内容

      */

      private List<?> data;

      public List< TableHead> getColumns() {

      return columns;

      }

      public void setColumns(List< TableHead> columns) {

      this.columns = columns;

      }

      public List<?> getData() {

      return data;

      }

      public void setData(List<?> data) {

      this.data = data;

      }

      }

      • 读取parquet文件

      import com.fasterxml.jackson.databind.ObjectMapper;

      import com.google.common.collect.Lists;

      import org.apache.hadoop.conf.Configuration;

      import org.apache.hadoop.fs.Path;

      import org.apache.parquet.column.page.PageReadStore;

      import org.apache.parquet.example.data.Group;

      import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;

      import org.apache.parquet.format.converter.ParquetMetadataConverter;

      import org.apache.parquet.hadoop.ParquetFileReader;

      import org.apache.parquet.hadoop.ParquetReader;

      import org.apache.parquet.hadoop.example.GroupReadSupport;

      import org.apache.parquet.hadoop.metadata.ParquetMetadata;

      import org.apache.parquet.io.ColumnIOFactory;

      import org.apache.parquet.io.MessageColumnIO;

      import org.apache.parquet.io.RecordReader;

      import org.apache.parquet.schema.GroupType;

      import org.apache.parquet.schema.MessageType;

      import org.apache.parquet.schema.OriginalType;

      import org.apache.parquet.schema.Type;

      import java.io.File;

      import java.io.IOException;

      import java.util.ArrayList;

      import java.util.List;

      public class ReadParquet {

      public static void main(String[] args) throws Exception {

      TableResult tableResult = parquetReaderV2(new File("./tmp/demo.snappy.parquet"));

      ObjectMapper mapper = new ObjectMapper();

      String jsonString = mapper.writerWithDefaultPrettyPrinter()

      .writeValueAsString(tableResult);

      System.out.println(jsonString);

      }

      public static TableResult parquetReaderV2(File file) throws Exception {

      long start = System.currentTimeMillis();

      haddopEnv();

      Path path = new Path(file.getAbsolutePath());

      Configuration conf = new Configuration();

      TableResult table = new TableResult();

      //二位数据列表

      List<List<Object>> dataList = Lists.newArrayList();

      ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);

      MessageType schema = readFooter.getFileMetaData().getSchema();

      ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, readFooter.getBlocks(), schema.getColumns());

      // 1.9.0使用以下创建对象

      // ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);

      PageReadStore pages = null;

      try {

      while (null != (pages = r.readNextRowGroup())) {

      final long rows = pages.getRowCount();

      // logger.info(file.getName()+" 行数: " + rows);

      final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);

      final RecordReader<Group> recordReader = columnIO.getRecordReader(pages,

      new GroupRecordConverter(schema));

      for (int i = 0; i <= rows; i++) {

      // System.out.println(recordReader.shouldSkipCurrentRecord());

      final Group g = recordReader.read();

      if (i == 0) {

      // 设置表头列名

      table.setColumns(parquetColumn(g));

      i++;

      }

      // 获取行数据

      List<Object> row = getparquetData(table.getColumns(), g);

      dataList.add(row);

      // printGroup(g);

      }

      }

      } finally {

      r.close();

      }

      // logger.info(file.getName()+" 加载时间:"+(System.currentTimeMillis() - start));

      table.setData(dataList);

      return table;

      }

      //新版本中new ParquetReader()所有构造方法好像都弃用了,用上面的builder去构造对象

      static void parquetReader(String inPath) throws Exception{

      GroupReadSupport readSupport = new GroupReadSupport();

      ParquetReader<Group> reader = new ParquetReader<Group>(new Path(inPath),readSupport);

      Group line=null;

      while((line=reader.read())!=null){

      System.out.println(line.toString());

      }

      System.out.println("读取结束");

      }

      private static List<Object> getparquetData(List<TableHead> columns, Group line) {

      List<Object> row = new ArrayList<>();

      Object cellStr = null;

      for (int i = 0; i < columns.size(); i++) {

      try {

      switch (columns.get(i).getType()) {

      case "DOUBLE":

      cellStr = line.getDouble(i, 0);

      break;

      case "FLOAT":

      cellStr = line.getFloat(i, 0);

      break;

      case "BOOLEAN":

      cellStr = line.getBoolean(i, 0);

      break;

      case "INT96":

      cellStr = line.getInt96(i, 0);

      break;

      case "LONG":

      cellStr = line.getLong(i, 0);

      break;

      default:

      cellStr = line.getValueToString(i, 0);

      }

      } catch (RuntimeException e) {

      } finally {

      row.add(cellStr);

      }

      }

      return row;

      }

      /**

      * 获取arrow 文件 表头信息

      *

      * @param

      * @return

      */

      private static List<TableHead> parquetColumn(Group line) {

      List<TableHead> columns = Lists.newArrayList();

      TableHead dto = null;

      GroupType groupType = line.getType();

      int fieldCount = groupType.getFieldCount();

      for (int i = 0; i < fieldCount; i++) {

      dto = new TableHead();

      Type type = groupType.getType(i);

      String fieldName = type.getName();

      OriginalType originalType = type.getOriginalType();

      String typeName = null;

      if (originalType != null) {

      typeName = originalType.name();

      } else {

      typeName = type.asPrimitiveType().getPrimitiveTypeName().name();

      }

      dto.setIndex(i);

      dto.setName(fieldName);

      dto.setType(typeName);

      columns.add(dto);

      }

      return columns;

      }

      public static void haddopEnv() throws IOException {

      File workaround = new File(".");

      System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());

      new File("./bin").mkdirs();

      new File("./bin/winutils.exe").createNewFile();

      }

      }

      微信公众号【Java搬砖小伙子】关注一波,更多资源等着你哦
      您的支持是我前进路上最大的动力,谢谢!

      【Java】列式存储格式之parquet读写

以上是 【Java】列式存储格式之parquet读写 的全部内容, 来源链接: utcz.com/a/92725.html

回到顶部