首页 > 解决方案 > 在 Apache Beam 和内存错误中处理块中的文件

问题描述

我们在云存储位置有多个文件。我们正在从该位置读取文件,并根据文件类型处理文件并写入输出主题。写入输出主题是基于文件类型。这是代码

    PCollection<FileIO.ReadableFile> data = pipeline.
                        apply(FileIO.match().filepattern(options.getReadDir())
                        .continuously(Duration.standardSeconds(10), Watch.Growth.never()))
                        .apply(FileIO.readMatches().withCompression(Compression.AUTO));
    
   
    
      PCollectionTuple outputData = data.apply(ParDo.of(new Transformer(tupleTagsMap, options.getBlockSize()))
                        .withOutputTags(TupleTags.customerTag, TupleTagList.of(tupleTagList))
                );
    
                outputData.get(TupleTags.topicA)
                        .apply("Write to customer topic",
                                PubsubIO.writeStrings().to(options.topicA));
    
                outputData.get(TupleTags.topicB)
                        .apply("Write to transaction topic",
                                PubsubIO.writeStrings().to(options.topicB));
    
    
      processContext.output(TupleTagsMap().get(importContext.getFileType().toString()), jsonBlock); This block of code is inside the transformer

这里的问题是其中一个文件非常大,它包含 1 亿条记录。

我们以块的形式添加到上面的 processContext.output 中,但是当它写入输出主题时,它是在整个文件处理完成时写入的。jsonBlock 是块之一。

因此,我们在处理大文件时遇到内存错误。原因是它没有被写入输出主题

如何解决这个问题?

标签: google-cloud-platformapache-beam

解决方案


推荐阅读