merge - 数据流如何组合 PCollection 替换对象
问题描述
我想实现一个加载 2 种数据的过程,比如说 A 类和 B 类,PCollection<A> a1, PCollection<B> b1
. 然后我创建一个 View.asMap()a1
并将其dfn1
作为侧输入提供给 DoFn,该侧输入应用于b1
. 这个 DoFn 使用了 Kind A 的一些值并输出它们。之后,我想创建一个PCollection<A> a2
包含所有对象的新对象a1
,但替换由dfn1
.
让我们说a1
持有对象o1, b1, c1, d1, e1, f1, g1
dfn1
操作和输出b1 -> b2, c1 -> c2, g1 -> g2
到PCollection<A> a2
新的 PCollection 结合a1
并a2
应该包含o1, b2, c2, e1, f1, g2
是否有内置机制来完成类似的事情?集合可以在“合并”之前键入。
提前致谢。
由于我对问题的英文解释不满意,这里有一个 DoFn,它执行我所要求的。真正的问题是,如果有一个内置的转换可以做这样的事情,最好不要手动创建视图。
public class CombineKvCollectionsWithMasterCollection extends DoFn<KV<String, Object>, Object>{
private static final long serialVersionUID = 4100849850259729106L;
private PCollectionView<Map<String, Object>> masterView;
public CombineKvCollectionsWithMasterCollection(PCollectionView<Map<String, Object>> masterView) {
this.masterView = masterView;
}
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Object> kv = c.element();
Map<String, Object> masterMap = c.sideInput(masterView);
if (masterMap.containsKey(kv.getKey())) {
c.output(masterMap.get(kv.getKey()));
} else {
c.output(kv.getValue());
}
}
}
解决方案
推荐阅读
- arrays - 从列表列表中获取具有条件递增元素顺序的新列表
- java - 无法从 autocompletebubbletext 获取字符串
- java - 表示方程
- google-colaboratory - 从 URL 打开 Google Colab 笔记本
- c# - 如何将选定的excel文件的列和行导入listView c#
- ffmpeg - ffmpeg 在特定时间之前从 n 帧开始剪切视频?
- python - Numpy 随机函数为给定参数创建不一致的形状
- php - PHP 不执行 MySqli 查询
- yum - 在 ECS 优化的 Amazon Linux 映像上安装 Glusterfs 客户端
- polymer-3.x - Polymer-3.x 是否有任何在线编码编辑器/IDE?