首页 > 解决方案 > Java 中 ParquetWriter 的问题

问题描述

我正在尝试使用 Parquet writer 编写 parquet 文件。分享下面的poc代码。我的 poc 的最终目标是将二进制类型转换为 FIXED_LEN_BYTE_ARRAY 用于十进制

        Types.MessageTypeBuilder builder = Types.buildMessage();
        builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BINARY, "legalName", LogicalTypeAnnotation.stringType().toOriginalType()));
        builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BINARY, 64,"vatPercentage", LogicalTypeAnnotation.decimalType(32,9).toOriginalType(), new DecimalMetadata(32,9), new Type.ID(64)));
        
        MessageType schema = builder.named(schemaName);
                    GroupWriteSupport.setSchema(schema, writeConfiguration);
                    GroupWriteSupport groupWriteSupport = new GroupWriteSupport();
                    groupWriteSupport.init(writeConfiguration);
        
        ParquetWriter writer = new ParquetWriter(new Path("file.parquet"),
                        groupWriteSupport, CompressionCodecName.SNAPPY,
                        ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
                        ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION);
    SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema);
            ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(filePath)).build();
            GenericRecord record;
            // write data
            while((record = reader.read()) != null) {
                Group group = simpleGroupFactory.newGroup();
                for(Schema.Field field : record.getSchema().getFields()) {
    String dataType = field.schema().getTypes().get(1).toString();
    if(dataType.equalsIgnoreCase("\"string\"")) {
                            group.add(field.name(), record.get(field.name()) != null ? record.get(field.name()).toString() : "null");
                        }
    else if (dataType.contains("{")) {
                            JSONObject object = new JSONObject(dataType);
                            String type = (String) object.get("type");
                            if(type.equalsIgnoreCase("bytes")) {
                                group.add(field.name(), record.get(field.name()) != null ? (Double) record.get(field.name()) : 0.0);
                            }
                        }
                }
            }
 writer.write(group);
        }

writer.close();

我遇到了以下问题。我不确定在投射时要写什么。

Exception in thread "main" java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter
    at org.apache.parquet.column.values.ValuesWriter.writeDouble(ValuesWriter.java:124)
    at org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeDouble(FallbackValuesWriter.java:192)
    at org.apache.parquet.column.impl.ColumnWriterBase.write(ColumnWriterBase.java:138)
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:496)
    at org.apache.parquet.example.data.simple.DoubleValue.writeValue(DoubleValue.java:38)
    at org.apache.parquet.example.data.simple.SimpleGroup.writeValue(SimpleGroup.java:229)
    at org.apache.parquet.example.data.GroupWriter.writeGroup(GroupWriter.java:51)
    at org.apache.parquet.example.data.GroupWriter.write(GroupWriter.java:37)
    at org.apache.parquet.hadoop.example.GroupWriteSupport.write(GroupWriteSupport.java:87)
    at org.apache.parquet.hadoop.example.GroupWriteSupport.write(GroupWriteSupport.java:37)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)

我也尝试使用 avroparquetwriter,但无法将十进制的二进制类型转换为 FIXED_LEN_BYTE_ARRAY。

使用 Java 的 AvroParquetWriter 问题

标签: parquet

解决方案


推荐阅读