首页 > 解决方案 > 如何使用 ParquetWriter 将 TIMESTAMP 逻辑类型(INT96)写入镶木地板?

问题描述

我有一个工具,它使用org.apache.parquet.hadoop.ParquetWriter将 CSV 数据文件转换为 parquet 数据文件。

目前,它只处理int32, double, 和string

我需要支持 parquettimestamp逻辑类型(注释为 int96),我不知道如何做到这一点,因为我在网上找不到精确的规范。

似乎这种时间戳编码(int96)很少见,并且没有得到很好的支持。我在网上找到的规格细节很少。这个 github README声明:

保存为 int96 的时间戳由当天的纳秒(前 8 个字节)和儒略日(后 4 个字节)组成。

具体来说:

  1. 我为MessageType模式中的列使用哪种镶木地板类型?我假设我应该使用原始类型,但我不确定是否有办法指定逻辑类型?PrimitiveTypeName.INT96
  2. 我如何写数据?即我以什么格式将时间戳写入组?对于 INT96 时间戳,我假设我必须编写一些二进制类型?

这是我的代码的简化版本,它演示了我正在尝试做的事情。具体来说,看一下“TODO”注释,这是代码中与上述问题相关的两点。

List<Type> fields = new ArrayList<>();
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int32_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "double_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.STRING, "string_col", null));

// TODO: 
//   Specify the TIMESTAMP type. 
//   How? INT96 primitive type? Is there a logical timestamp type I can use w/ MessageType schema?
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT96, "timestamp_col", null)); 

MessageType schema = new MessageType("input", fields);

// initialize writer
Configuration configuration = new Configuration();
configuration.setQuietMode(true);
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
  new Path("output.parquet"),
  new GroupWriteSupport(),
  CompressionCodecName.SNAPPY,
  ParquetWriter.DEFAULT_BLOCK_SIZE,
  ParquetWriter.DEFAULT_PAGE_SIZE,
  1048576,
  true,
  false,
  ParquetProperties.WriterVersion.PARQUET_1_0,
  configuration
);

// write CSV data
CSVParser parser = CSVParser.parse(new File(csv), StandardCharsets.UTF_8, CSVFormat.TDF.withQuote(null));
ArrayList<String> columns = new ArrayList<>(schemaMap.keySet());
int colIndex;
int rowNum = 0;
for (CSVRecord csvRecord : parser) {
  rowNum ++;
  Group group = f.newGroup();
  colIndex = 0;
  for (String record : csvRecord) {
    if (record == null || record.isEmpty() || record.equals( "NULL")) {
      colIndex++;
      continue;
    }


    record = record.trim();
    String type = schemaMap.get(columns.get(colIndex)).get("type").toString();
    MessageTypeConverter.addTypeValueToGroup(type, record, group, colIndex++);

    switch (colIndex) {
      case 0: // int32
        group.add(colIndex, Integer.parseInt(record));
        break;
      case 1: // double
        group.add(colIndex, Double.parseDouble(record));
        break;
      case 2: // string
        group.add(colIndex, record);
        break;
      case 3:
        // TODO: convert CSV string value to TIMESTAMP type (how?)
        throw new NotImplementedException();
    }
  }
  writer.write(group);
}
writer.close();

标签: javaapache-sparkhadoopparquet

解决方案


  1. INT96 时间戳使用 INT96 物理类型,没有任何逻辑类型,所以不要用任何东西注释它们。
  2. 如果您对 INT96 时间戳的结构感兴趣,请查看此处。如果您想查看与此格式相互转换的示例代码,请查看Hive 中的此文件

推荐阅读