首页 > 解决方案 > 将 GenericRecords 的 pCollection 写入 Parquet 文件的数据流

问题描述

在 apache 梁步骤中,我有一个 PCollection KV<String, Iterable<KV<Long, GenericRecord>>>>。我想将迭代中的所有记录写入同一个镶木地板文件。我的代码片段如下

p.apply(ParDo.of(new MapWithAvroSchemaAndConvertToGenericRecord())) // PCollection<GenericRecord>
.apply(ParDo.of(new MapKafkaGenericRecordValue(formatter, options.getFileNameDelimiter()))) //PCollection<KV<String, KV<Long, GenericRecord>>>
.apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>

现在我想将 Iterable 中的所有记录写入同一个 parquet 文件中(通过 KV 的键导出文件名)。

标签: javaapache-beamparquetdataflow

解决方案


我找到了问题的解决方案。在步骤 -

apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>

我将应用另一个转换,它将只返回 Iterable 作为输出 pCollection。`.apply(ParDo.of(new GetIterable())) //PCollection>> 其中 key 是我必须写入的文件的名称。然后剩下的片段是

.apply(Flatten.iterables())
                .apply(
                        FileIO.<String, KV<String, GenericRecord>>writeDynamic()
                                .by((SerializableFunction<KV<String, GenericRecord>, String>) KV::getKey)
                                .via(
                                        Contextful.fn(
                                                (SerializableFunction<KV<String, GenericRecord>, GenericRecord>) KV::getValue
                                        ),
                                        ParquetIO.sink(schema)
                                                .withCompressionCodec(CompressionCodecName.SNAPPY)


                                )

                                .withTempDirectory("/tmp/temp-beam")
                                .to(options.getGCSBucketUrl())
                                .withNumShards(1)
                                .withDestinationCoder(StringUtf8Coder.of())
                )

推荐阅读