首页 > 解决方案 > 有没有办法在 Beam 的 ParDo 转换中创建 SpecificRecord 列表以编写 Parquet 文件?

问题描述

我正在尝试在 Beam/Java 中编写一个 Dataflow 作业来处理来自 Pub/Sub 并写入 Parquet 的一系列事件。Pub/Sub 中的事件采用 JSON 格式,每个事件都可以生成一行或多行。我能够编写一个非常简单的示例来编写仅返回 1 条记录的 ParDo 转换。ParDo 看起来像这样

    static class GenerateRecords extends DoFn<String, GenericRecord> {
        @ProcessElement
        public void processElement(ProcessContext context) {
            final GenericData.Record record = new GenericData.Record(schema);
            String msg = context.element();

            com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);


            context.output(pRecord);
        }
    }

和管道的写入部分

                .apply("Write to file",
                FileIO.<GenericRecord>
                        write()
                        .via(
                                ParquetIO.sink(schema)
                                        .withCompressionCodec(CompressionCodecName.SNAPPY)
                        )
                        .to(options.getOutputDirectory())
                        .withNumShards(options.getNumShards())
                        .withSuffix("pfile")
                );

我的问题是,如何概括此 ParDo 转换以返回记录列表?我尝试了 List 但这不起作用,ParquetIO.sink(schema) 在“无法解析方法通过”处咆哮。

标签: javagoogle-cloud-dataflowapache-beamdataflow

解决方案


context.output()您可以DoFn根据需要多次调用。因此,如果您知道在什么情况下需要发出多条记录的业务逻辑,那么您只需要调用context.output(record)每条输出记录即可。它应该比拥有一个PCollection容器更简单。

PS:顺便说一句,我有一个简单的例子来说明如何写GenericRecords ,ParquetIOAvroCoder可能会有所帮助。


推荐阅读