google-cloud-platform - 在 Apache Beam 和内存错误中处理块中的文件
问题描述
我们在云存储位置有多个文件。我们正在从该位置读取文件,并根据文件类型处理文件并写入输出主题。写入输出主题是基于文件类型。这是代码
PCollection<FileIO.ReadableFile> data = pipeline.
apply(FileIO.match().filepattern(options.getReadDir())
.continuously(Duration.standardSeconds(10), Watch.Growth.never()))
.apply(FileIO.readMatches().withCompression(Compression.AUTO));
PCollectionTuple outputData = data.apply(ParDo.of(new Transformer(tupleTagsMap, options.getBlockSize()))
.withOutputTags(TupleTags.customerTag, TupleTagList.of(tupleTagList))
);
outputData.get(TupleTags.topicA)
.apply("Write to customer topic",
PubsubIO.writeStrings().to(options.topicA));
outputData.get(TupleTags.topicB)
.apply("Write to transaction topic",
PubsubIO.writeStrings().to(options.topicB));
processContext.output(TupleTagsMap().get(importContext.getFileType().toString()), jsonBlock); This block of code is inside the transformer
这里的问题是其中一个文件非常大,它包含 1 亿条记录。
我们以块的形式添加到上面的 processContext.output 中,但是当它写入输出主题时,它是在整个文件处理完成时写入的。jsonBlock 是块之一。
因此,我们在处理大文件时遇到内存错误。原因是它没有被写入输出主题
如何解决这个问题?
解决方案
推荐阅读
- r - R中的时间序列标签
- google-app-engine - Google Cloud Endpoints 和 App Engine 的延迟在 1 到 2 秒之间是否正常?
- python - Google Cloud ML-Engine 在线预测错误:为已包含的占位符张量提供值
- java - FileOutputStream try-with-resources 不关闭文件描述符
- django - 使用 UniqueConstraint 解决 Django admin 中的“get() 返回多个”错误
- sas - SAS,转置表
- c - 如何使用 OpenMP 并行化嵌套的 FOR 循环
- micronaut - 我可以将 Apache Camel 与 Micronaut 一起使用吗?
- sql - 如何在没有重复礼物的情况下计算礼物金额
- java - 在 Kotlin 中启用 CORS 时,Spark 在访问路由时返回空响应