首页 > 解决方案 > 将两个通用记录合并为一个并将其写入镶木地板文件

问题描述

在全局范围内,我们的想法是读取一个 parquet 文件(返回 PCollection)根据我们读取的文件创建新的 PCollection,将这两个 PCollection 合并为一个并将其写入其他位置的 parquet 文件中。

我正在 SparkRunner 上尝试此代码。

我不能发布完整的代码,但我会尝试指出逻辑。

这是一个代码片段:

PCollection<GenericRecord> oldGen = pipeline.apply(ParquetIO.read(SCHEMA).from(/path);
PCollection<GenericRecord> newGen = processNew(); //in this part I am making new PCollection
PCollectionList<GenericRecord> pList = PCollectionList.of(oldGen).and(newGen);

pList
     .apply(Flatten.pCollections())
     .setCoder(AvroCoder.of(GenericRecord.class, SCHEMA))
     .apply(FileIO.<GenericRecord>write()
         .via(ParquetIO.sink(SCHEMA)).to(/outputlocation));

当我检查输出位置时,我只得到应该展平的两个 PCollection(没有规则是哪一个)。我尝试在展平集合后进行一些打印,打印看起来很好,我也尝试将这些数据写入 .txt 格式,并且效果也很好。

此外,我尝试了一种愚蠢的解决方案,令我惊讶的是,它以某种方式通过了。oldGen 参数有一列带有布尔标志。我已经将该 pcollection 转换为两个集合,一个带有 true 标志,另一个带有 false 标志。

PCollectionList 的片段如下所示:

PCollectionList<GenericRecord> pList = PCollectionList.of(newGen).and(trueGen).and(falseGen);

写入 parquet 看起来与前面的代码片段相同,但是通过这个代码片段,我得到了一个包含所有必要记录的好 parquet 文件。

这看起来真的很奇怪,因为在这两种情况下,当我将这些 PCollection 变平时,我都会得到相同的 PCollection,而当我在 DirectRunner 上运行它时,一切正常,没有任何问题。

Beam 版本是 2.14.0,Spark 版本是 2.2.3。

有谁知道为什么在第一种情况下会发生这种情况?

标签: javaapache-beam

解决方案


推荐阅读