首页 > 解决方案 > 如何仅在窗口完成时输出窗口聚合的结果?

问题描述

我有一个KStream我想在其中计算事件的某个维度。我这样做如下:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
  .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
  .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));

我想将KStream这些聚合作为事件进行更新。我可以像这样轻松地做到这一点:

ret.toStream().to("output");

问题是“输入”主题中的每个事件都会产生一个“输出”主题的事件。我只想在窗口完成时将事件发布到输出主题。例如,如果窗口为一分钟,则每键每分钟发送一个事件。

我想我可以这样做:

ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));

但我想知道是否有更好/更优雅的方式来做到这一点?

标签: apache-kafkaapache-kafka-streams

解决方案


您可以在 2.1 版本中使用 KTable KTable.suppress的新功能

此方法允许您为窗口计算的每个窗口/键获得一个最终结果。

更多关于suppresKIP -328

你可以像这样更新你的实现suppress

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
        .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
        .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()))
        .suppress(untilWindowCloses(BufferConfig.unbounded()));

ret.toStream().to("output"); // now stream should flush events to the output topic only when the window closes

推荐阅读