java - 将 GenericRecords 的 pCollection 写入 Parquet 文件的数据流
问题描述
在 apache 梁步骤中,我有一个 PCollection KV<String, Iterable<KV<Long, GenericRecord>>>>
。我想将迭代中的所有记录写入同一个镶木地板文件。我的代码片段如下
p.apply(ParDo.of(new MapWithAvroSchemaAndConvertToGenericRecord())) // PCollection<GenericRecord>
.apply(ParDo.of(new MapKafkaGenericRecordValue(formatter, options.getFileNameDelimiter()))) //PCollection<KV<String, KV<Long, GenericRecord>>>
.apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>
现在我想将 Iterable 中的所有记录写入同一个 parquet 文件中(通过 KV 的键导出文件名)。
解决方案
我找到了问题的解决方案。在步骤 -
apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>
我将应用另一个转换,它将只返回 Iterable 作为输出 pCollection。`.apply(ParDo.of(new GetIterable())) //PCollection>> 其中 key 是我必须写入的文件的名称。然后剩下的片段是
.apply(Flatten.iterables())
.apply(
FileIO.<String, KV<String, GenericRecord>>writeDynamic()
.by((SerializableFunction<KV<String, GenericRecord>, String>) KV::getKey)
.via(
Contextful.fn(
(SerializableFunction<KV<String, GenericRecord>, GenericRecord>) KV::getValue
),
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.withTempDirectory("/tmp/temp-beam")
.to(options.getGCSBucketUrl())
.withNumShards(1)
.withDestinationCoder(StringUtf8Coder.of())
)
推荐阅读
- database - 使用 UNWIND 和 CREATE 创建多个关系
- python - 如果值是整数,Python递归地添加到数字,否则如果值是字典,则添加该字典中的所有项目
- docker - ElasticBeanstalk nginx 在发送 Context-Length 标头时返回 400 Bad Request
- log4j2 - log4j2 将特定类的日志记录级别配置为全部,并将其余类限制为“错误”级别
- react-native - 创建 Axios 实例时,在我的情况下如何从异步存储中分配值
- python - Python - 为 PDF 文件的不同页面定义打印机托盘
- reactjs - 在 ApolloConsumer 的上下文中找不到客户端
- bash - 如果没有文件与 glob 匹配,则防止“mv”命令引发错误。例如" mv *.json /dir/
- sql - 我们可以在 oracle 数据库的一个单元格中有多个 '/' 分隔的值吗?是否违反任何规则?
- powershell - POWERSHELL - 如何将命令 qwinsta 转换为表对象