首页 > 解决方案 > 数据流不同的操作不缩放

问题描述

我有一个带有“最终”阶段的线性管道,每秒输出大约 200k 个元素(短字符串)。

但是,当我在该阶段 ( ) 之后添加 Distinct 操作时,myPCollection.apply(Distinct.<String>create());之前阶段的速度Distinct下降到每秒处理的大约 80k 个元素。

但是,我正在处理一个没有最大工作人员数量的有界集合,因此我希望 Dataflow 能够自动增加工作人员的数量以匹配工作负载。这不仅不会发生,当我手动启动有许多工人(20+)的管道时,它会自动缩小到几个工人。

如何使 Dataflow 升级工作池,以便此 Distinct 操作不会显着降低管道的处理速率?

标签: performancegoogle-cloud-dataflowapache-beamautoscaling

解决方案


看看. _Distinct

如您所见,它首先对元素进行分组,然后再选取第一个元素。我已经提交了一个错误来改善这种行为。

在当前的实现中,所有元素首先被分组,这需要将它们写入持久存储,然后再被拾取。如果您有任何多次出现的元素(即热键),您将遇到可以写出多少数据的瓶颈。

作为一个技巧,您可以在写出元素之前添加一个重复数据删除的 DoFn 。像这样的东西:

class MapperDedupFn extends DoFn<String, String> {
    Set<String> seenElements;
    MapperDedupFn() {
      seenElements = new HashSet<>();
    }

    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> receiver) {
      if (seenElements.contains(element)) return;

      seenElements.add(element)
      receiver.output(word);
    }
  }
}

您应该能够在Distinct函数之前坚持这一点,并希望有更好的性能。


推荐阅读