apache-kafka - 如何仅在窗口完成时输出窗口聚合的结果?
问题描述
我有一个KStream
我想在其中计算事件的某个维度。我这样做如下:
KTable<Windowed<Long>, Counter> ret = input.groupByKey()
.windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
.aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));
我想将KStream
这些聚合作为事件进行更新。我可以像这样轻松地做到这一点:
ret.toStream().to("output");
问题是“输入”主题中的每个事件都会产生一个“输出”主题的事件。我只想在窗口完成时将事件发布到输出主题。例如,如果窗口为一分钟,则每键每分钟发送一个事件。
我想我可以这样做:
ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));
但我想知道是否有更好/更优雅的方式来做到这一点?
解决方案
您可以在 2.1 版本中使用 KTable KTable.suppress的新功能
此方法允许您为窗口计算的每个窗口/键获得一个最终结果。
更多关于suppres
KIP -328
你可以像这样更新你的实现suppress
:
KTable<Windowed<Long>, Counter> ret = input.groupByKey()
.windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
.aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()))
.suppress(untilWindowCloses(BufferConfig.unbounded()));
ret.toStream().to("output"); // now stream should flush events to the output topic only when the window closes
推荐阅读
- flutter - 如何制作可调整大小的TextField?
- javascript - 如何在节点 js 中写入 xls 文件和流以响应
- r - dplyr:计算每天的账户余额
- html - 在 HTML/CSS 中覆盖浏览器默认字体 - 但不是代码字体?
- ios - 创建可重用的 UIView 子类 - 遇到大小问题
- swift - 如何快速执行一个函数 5 秒
- android - Android:运行测试用例时:无法确定任务':app:compileDebugAndroidTestJavaWithJavac'的依赖关系
- python - 如何将 self 参数传递给 python cProfile
- reactjs - 应用 find() 时获得 2 个渲染而不是 1 个渲染
- reactjs - React 构建文件不适用于共享主机