首页 > 解决方案 > 带有计数和持续时间的 Storm 中的翻滚窗口

问题描述

如何在 Storm 中创建 Tumbling 窗口具有两个阈值。例如,如果我将 WindowCount 设置为 500 并将 WindowDuration 设置为 5 秒,则即使消息少于 500 条但已过去 5 秒,窗口也应该得到处理。我可以看到这两个功能的独立 API

计数

.tumblingWindow(1000, windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))

时间

.tumblingWindow(Duration.seconds(5), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))

我可以两者结合吗?

如果我通过 MessageCount 而不是 Duration 配置,当我停止拓扑时我的消息会发生什么?即使没有收到批数,Storm 也会处理这些消息吗?或者我会丢失这些消息吗?

标签: javatimecountwindowapache-storm

解决方案


我不相信你可以用当前的窗口 API 做到这一点。

该代码是可插入的,足以允许在内部使用,但您需要的 API 并未公开。有两个接口来定义如何处理窗口。

TriggerPolicy决定何时将窗口传递给bolt(例如“在缓冲100 个元组时传递”)。

EvictionPolicy决定何时从当前窗口驱逐元组(例如,“一旦元组在窗口中的最新元组后面超过 500 个元组,就丢弃元组”)。

您可以通过例如BaseWindowedBolt.withWindowLength间接配置这些策略,它在内部只是设置一些配置属性。这些属性用于确定WindowedBoltExecutor中的触发/驱逐策略。

我认为需要的是允许用户提供他们自己的自定义 TriggerPolicy/EvictionPolicy,或者添加一个新的 Trigger/EvictionPolicy 来满足你的需求。

如果您想为此提出问题,可以在https://issues.apache.org/jira/projects/STORM提出。如果你想贡献代码,源代码在https://github.com/apache/storm,你也可以在这里提出 PR。


推荐阅读