java - 将两个通用记录合并为一个并将其写入镶木地板文件
问题描述
在全局范围内,我们的想法是读取一个 parquet 文件(返回 PCollection)根据我们读取的文件创建新的 PCollection,将这两个 PCollection 合并为一个并将其写入其他位置的 parquet 文件中。
我正在 SparkRunner 上尝试此代码。
我不能发布完整的代码,但我会尝试指出逻辑。
这是一个代码片段:
PCollection<GenericRecord> oldGen = pipeline.apply(ParquetIO.read(SCHEMA).from(/path);
PCollection<GenericRecord> newGen = processNew(); //in this part I am making new PCollection
PCollectionList<GenericRecord> pList = PCollectionList.of(oldGen).and(newGen);
pList
.apply(Flatten.pCollections())
.setCoder(AvroCoder.of(GenericRecord.class, SCHEMA))
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(SCHEMA)).to(/outputlocation));
当我检查输出位置时,我只得到应该展平的两个 PCollection(没有规则是哪一个)。我尝试在展平集合后进行一些打印,打印看起来很好,我也尝试将这些数据写入 .txt 格式,并且效果也很好。
此外,我尝试了一种愚蠢的解决方案,令我惊讶的是,它以某种方式通过了。oldGen 参数有一列带有布尔标志。我已经将该 pcollection 转换为两个集合,一个带有 true 标志,另一个带有 false 标志。
PCollectionList 的片段如下所示:
PCollectionList<GenericRecord> pList = PCollectionList.of(newGen).and(trueGen).and(falseGen);
写入 parquet 看起来与前面的代码片段相同,但是通过这个代码片段,我得到了一个包含所有必要记录的好 parquet 文件。
这看起来真的很奇怪,因为在这两种情况下,当我将这些 PCollection 变平时,我都会得到相同的 PCollection,而当我在 DirectRunner 上运行它时,一切正常,没有任何问题。
Beam 版本是 2.14.0,Spark 版本是 2.2.3。
有谁知道为什么在第一种情况下会发生这种情况?
解决方案
推荐阅读
- javascript - 我可以将变量设置为输入元素的最大值吗?
- go - 允许在 net.Conn 和频道上选择 Go Routine 的方法?
- c++ - 无法解决 bfs 实现中的错误
- apache-kafka - 使用 FilePulse 从 XML 中输入 Kakfa 主题中的数据 - ExplodeFilter
- ios - 使用 OpenCV 框架的 Appcelerator iOS 模块的循序渐进
- django - 从包含 Django 应用程序的 Docker 访问位于 Windows 的 Firebird 数据库
- spring-webflux - 转换与转换延迟
- magento2 - magento 2 如何创建具有不同尺寸和不同颜色的可配置产品
- c# - 高级 datagridview 单元格格式无法正常工作
- python - Python——在循环中丢失并且无法获得正确的值