java - 有没有办法在 Beam 的 ParDo 转换中创建 SpecificRecord 列表以编写 Parquet 文件?
问题描述
我正在尝试在 Beam/Java 中编写一个 Dataflow 作业来处理来自 Pub/Sub 并写入 Parquet 的一系列事件。Pub/Sub 中的事件采用 JSON 格式,每个事件都可以生成一行或多行。我能够编写一个非常简单的示例来编写仅返回 1 条记录的 ParDo 转换。ParDo 看起来像这样
static class GenerateRecords extends DoFn<String, GenericRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
final GenericData.Record record = new GenericData.Record(schema);
String msg = context.element();
com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);
context.output(pRecord);
}
}
和管道的写入部分
.apply("Write to file",
FileIO.<GenericRecord>
write()
.via(
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.to(options.getOutputDirectory())
.withNumShards(options.getNumShards())
.withSuffix("pfile")
);
我的问题是,如何概括此 ParDo 转换以返回记录列表?我尝试了 List 但这不起作用,ParquetIO.sink(schema) 在“无法解析方法通过”处咆哮。
解决方案
context.output()
您可以DoFn
根据需要多次调用。因此,如果您知道在什么情况下需要发出多条记录的业务逻辑,那么您只需要调用context.output(record)
每条输出记录即可。它应该比拥有一个PCollection
容器更简单。
PS:顺便说一句,我有一个简单的例子来说明如何写GenericRecord
s ,ParquetIO
这AvroCoder
可能会有所帮助。
推荐阅读
- python - 如果包含其他 Python 脚本的子进程,我如何制作一个独立的 Python exe?
- php - 使用 Slim Framework for PHP 获取帖子数据
- java - java.xml.soap._ 错误:对象soap不是包javax.xml的成员
- python - 扩展方法时将局部变量保留在范围内
- python - 如何在不更改索引的情况下从熊猫中采样并保留剩余样本
- flutter - 如何在颤动中从地图中选择地址
- qt - QNetworkAccessManager 不调用 QNetworkReply
- javascript - 尝试对要显示的数据库中保存的数据实施拒绝/接受。反应
- makefile - 当先决条件没有后缀或前缀时,make 无法确定隐式规则?
- c++ - 将指针传递给函数中的指针时地址如何更改