performance - 数据流不同的操作不缩放
问题描述
我有一个带有“最终”阶段的线性管道,每秒输出大约 200k 个元素(短字符串)。
但是,当我在该阶段 ( ) 之后添加 Distinct 操作时,myPCollection.apply(Distinct.<String>create());
之前阶段的速度Distinct
下降到每秒处理的大约 80k 个元素。
但是,我正在处理一个没有最大工作人员数量的有界集合,因此我希望 Dataflow 能够自动增加工作人员的数量以匹配工作负载。这不仅不会发生,当我手动启动有许多工人(20+)的管道时,它会自动缩小到几个工人。
如何使 Dataflow 升级工作池,以便此 Distinct 操作不会显着降低管道的处理速率?
解决方案
看看. _Distinct
如您所见,它首先对元素进行分组,然后再选取第一个元素。我已经提交了一个错误来改善这种行为。
在当前的实现中,所有元素首先被分组,这需要将它们写入持久存储,然后再被拾取。如果您有任何多次出现的元素(即热键),您将遇到可以写出多少数据的瓶颈。
作为一个技巧,您可以在写出元素之前添加一个重复数据删除的 DoFn 。像这样的东西:
class MapperDedupFn extends DoFn<String, String> {
Set<String> seenElements;
MapperDedupFn() {
seenElements = new HashSet<>();
}
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
if (seenElements.contains(element)) return;
seenElements.add(element)
receiver.output(word);
}
}
}
您应该能够在Distinct
函数之前坚持这一点,并希望有更好的性能。
推荐阅读
- python - 根据条件遍历不同级别的嵌套字典
- mysql - ZF3 - Zend\Db\Sql\Predicate\Expression - SQL DATE_FORMAT 上的错误转义
- java - Swagger/OpenAPI 注释 V3 - 在 swagger 注释中使用枚举值
- c - 与程序在同一目录下的所有文件的加密功能
- python - 使用 Docplex 最大限度地减少延迟百分比
- c# - 是否可以在 winforms 应用程序中使用 c# 从文件中提取元数据?
- javascript - 使用 javascript 复制永久链接
- javascript - 触摸移动不滑动滑动滑块上的多个元素
- node.js - Puppeteer 似乎不适用于 docker
- java - 如何从 ble 设备 android 读取数据?