首页 > 解决方案 > 跨窗口复制数据以填充不定数量的点

问题描述

我正在尝试将数据记录复制到未来的窗口中。这解决的问题是每个窗口的计算统计数据将更加准确,因为这些数据是连续的(如温度)并且需要基线值。

在这些图中,每个框代表一个固定窗口。每个窗口中的数字代表来自源的 PCollection 中的数据。

这是一个示例输入 PCollection:

+---------+---------+---------+--------->
| 1  2    |       3 |         |         |
+---------+---------+---------+--------->

结果输出 PCollection:

+---------+---------+---------+--------->
| 1  2    | 2     3 | 3       | 3       |
+---------+---------+---------+--------->

请注意最新数据点(基于事件时间戳)如何转发到下一个窗口。如果有多个空窗口,则必须重新转发该值。

我已经解决了转发一次​​的问题,方法是通过一个有状态的 DoFn 运行窗口 PCollection,该 DoFn 发出一个额外的重复和修改的元素:

public class DupeFn extends DoFn<Datum, Datum> {
    @StateId("latest")
    private final StateSpec<ValueState<Datum>> latestStateSpec = StateSpecs.value();

    @TimerId("emit")
    private final TimerSpec emitSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement
    public void processElement(
            @Element Datum element,
            OutputReceiver<Datum> receiver,
            IntervalWindow window,
            @StateId("latest") ValueState<Datum> latest,
            @TimerId("emit") Timer emit
    ) {
        emit.set(window.maxTimestamp());

        Datum prev = latest.read();

        if (prev == null || element.timestamp > prev.timestamp) {
            latest.write(element);
        }

        receiver.output(element);
    }

    @OnTimer("emit")
    public void emitLatest(
            OutputReceiver<Datum> receiver,
            IntervalWindow window,
            @StateId("latest") ValueState<Datum> latest
    ) {
        Datum last = latest.read();

        // modify the timestamp such that it lands in the next window
        last.timestamp = window.end().getMillis() + 10;
        last.id += " DUPED";

        receiver.outputWithTimestamp(last, new Instant(last.timestamp));
    }
}

现在的问题是,如果有一个空窗口,则不会将任何内容复制到进行中的窗口中。理想情况下,行为将如上图所述。

有没有办法做到这一点?

编辑
我发现了这篇相关的未发表的博客文章。

标签: javagoogle-cloud-dataflowapache-beam

解决方案


目前 Beam 文档存在一个小问题,一旦修复,博客就会出现。循环计时器将为您提供解决此问题的部分方法。因为即使在没有数据的情况下,它也会确保每个间隔窗口中都有活动。

编辑:博客现在可以在这里使用链接到博客

下一部分需要使用全局窗口,这会带来更多的复杂性。下周的 Apache Beam 峰会上将对此进行讨论。

柏林峰会

为了保持状态,您需要将固定窗口聚合流到 GlobalWindow。但是 GlobalWindow 不保证顺序,因此您需要遵循以下流程:

@ProcessElement

@OnTimer

  • 按时间戳读取和排序 BaggedList
  • 如果下一个聚合没有值(它是使用循环计时器而不是外部数据源制作的),则将每个聚合的最终值链接到下一个聚合中。
  • 输出时间戳为 < 然后 OnTimer.Timestamp 的所有值
  • 清除任何已处理元素的包列表,请注意今天这是低效的,因为您无法从列表中删除特定元素。如果您查看 Apache Beam 上的开发列表,将会有一个很好的讨论,关于未来对 Sorted Map 的请求,这将非常有用。

对不起,它不是一个简短的答案!


推荐阅读