首页 > 解决方案 > 使用状态或计时器时避免重新洗牌?

问题描述

我发现自己在编写 Beam 管道的情况下,我想使用状态或计时器,其中数据可能已经被GroupByKey我不想打扰的前一个以某种方式分片。但是 API 说状态或计时器需要KV输入到PTransform. 也许有状态/定时转换在GroupByKey内部进行。

有没有办法在不重新分片/重新洗牌的情况下使用状态/计时器?

这是一个具体的用例:我在实现自己的指标收集而不是使用内置的 Stackdriver 指标时遇到了性能问题,在这种情况下,涉及洗牌的转换的系统延迟开始上升,并且通常在一段时间后无法恢复时间。

这是相关代码,我将来自不同地方的度量值汇集到一个键中,这些数据可能以不同的方式分片,仅仅是因为我需要使用计时器。

    metricsFromVariousPlaces
      .apply(Flatten.pCollections())
      .apply(WithKeys.of(null.asInstanceOf[Void]))
      .apply("write influx points",
             new InfluxSinkTransform(...)

InfluxSinkTransform需要计时器以便及时刷新对 InfluxDB 的写入。

我知道这会导致重新洗牌,因为现在数据都在一个分片下。我预计这种改组会很昂贵,并希望尽可能避免。

我尝试保留上一次转换中的密钥,但看起来仍然在改组:

  "stage_id": "S68",
  "stage_name": "F503",
  "fused_steps": [
...
    {
      "name": "s29.org.apache.beam.sdk.values.PCollection.<init>:402#b70c45c110743c2b-write-streaming-shuffle430",
      "originalTransform": "snapshot/MapElements/Map.out0",
      "userName": "snapshot/MapElements/Map.out0/FromValue/WriteStream"
    }
  ]
...
{
  "stage_id": "S74",
  "stage_name": "F509",
  "fused_steps": [
    {
      "name": "s29.org.apache.beam.sdk.values.PCollection.<init>:402#b70c45c110743c2b-read-streaming-shuffle431",
      "originalTransform": "snapshot/MapElements/Map.out0",
      "userName": "snapshot/MapElements/Map.out0/FromValue/ReadStream"
    },
    {
      "name": "s30",
      "originalTransform": "snapshot/write influx points/ParDo(Do)",
      "userName": "snapshot/write influx points/ParDo(Do)"
    }
  ]

标签: google-cloud-dataflowapache-beam

解决方案


即使键保持不变,也无法使用状态或计时器而不引起改组。


推荐阅读