parquet - Java 中 ParquetWriter 的问题
问题描述
我正在尝试使用 Parquet writer 编写 parquet 文件。分享下面的poc代码。我的 poc 的最终目标是将二进制类型转换为 FIXED_LEN_BYTE_ARRAY 用于十进制
Types.MessageTypeBuilder builder = Types.buildMessage();
builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BINARY, "legalName", LogicalTypeAnnotation.stringType().toOriginalType()));
builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BINARY, 64,"vatPercentage", LogicalTypeAnnotation.decimalType(32,9).toOriginalType(), new DecimalMetadata(32,9), new Type.ID(64)));
MessageType schema = builder.named(schemaName);
GroupWriteSupport.setSchema(schema, writeConfiguration);
GroupWriteSupport groupWriteSupport = new GroupWriteSupport();
groupWriteSupport.init(writeConfiguration);
ParquetWriter writer = new ParquetWriter(new Path("file.parquet"),
groupWriteSupport, CompressionCodecName.SNAPPY,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION);
SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(filePath)).build();
GenericRecord record;
// write data
while((record = reader.read()) != null) {
Group group = simpleGroupFactory.newGroup();
for(Schema.Field field : record.getSchema().getFields()) {
String dataType = field.schema().getTypes().get(1).toString();
if(dataType.equalsIgnoreCase("\"string\"")) {
group.add(field.name(), record.get(field.name()) != null ? record.get(field.name()).toString() : "null");
}
else if (dataType.contains("{")) {
JSONObject object = new JSONObject(dataType);
String type = (String) object.get("type");
if(type.equalsIgnoreCase("bytes")) {
group.add(field.name(), record.get(field.name()) != null ? (Double) record.get(field.name()) : 0.0);
}
}
}
}
writer.write(group);
}
writer.close();
我遇到了以下问题。我不确定在投射时要写什么。
Exception in thread "main" java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter
at org.apache.parquet.column.values.ValuesWriter.writeDouble(ValuesWriter.java:124)
at org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeDouble(FallbackValuesWriter.java:192)
at org.apache.parquet.column.impl.ColumnWriterBase.write(ColumnWriterBase.java:138)
at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:496)
at org.apache.parquet.example.data.simple.DoubleValue.writeValue(DoubleValue.java:38)
at org.apache.parquet.example.data.simple.SimpleGroup.writeValue(SimpleGroup.java:229)
at org.apache.parquet.example.data.GroupWriter.writeGroup(GroupWriter.java:51)
at org.apache.parquet.example.data.GroupWriter.write(GroupWriter.java:37)
at org.apache.parquet.hadoop.example.GroupWriteSupport.write(GroupWriteSupport.java:87)
at org.apache.parquet.hadoop.example.GroupWriteSupport.write(GroupWriteSupport.java:37)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
我也尝试使用 avroparquetwriter,但无法将十进制的二进制类型转换为 FIXED_LEN_BYTE_ARRAY。
解决方案
推荐阅读
- arcgis-js-api - 无法在 ArcGIS 中获取位置、3D 成像或航向(方向)
- next.js - 找不到模块:部署到 Vercel 时无法解析“/vercel/2d531da8/node_modules/mysql/lib/protocol/sequences”中的“fs”
- keras - AttributeError: Tensor.op 在启用 Eager Execution 时毫无意义
- sql - 替换字符串中的字符。系统间缓存 SQL
- logic - 如何在CLIPS中进行两组的补差运算?
- continuous-integration - 如果缓存为空,如何触发 gitlab 作业?
- android - 对这个 Observable 进行编码以获取位置的正确方法是什么?
- bash - Bash 创建一个同名但小写的新文件
- ruby-on-rails - 如何为单个迁移设置 statement_timeout?
- docker - Docker 镜像修剪过滤