java - 带有计数和持续时间的 Storm 中的翻滚窗口
问题描述
如何在 Storm 中创建 Tumbling 窗口具有两个阈值。例如,如果我将 WindowCount 设置为 500 并将 WindowDuration 设置为 5 秒,则即使消息少于 500 条但已过去 5 秒,窗口也应该得到处理。我可以看到这两个功能的独立 API
计数
.tumblingWindow(1000, windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
时间
.tumblingWindow(Duration.seconds(5), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
我可以两者结合吗?
如果我通过 MessageCount 而不是 Duration 配置,当我停止拓扑时我的消息会发生什么?即使没有收到批数,Storm 也会处理这些消息吗?或者我会丢失这些消息吗?
解决方案
我不相信你可以用当前的窗口 API 做到这一点。
该代码是可插入的,足以允许在内部使用,但您需要的 API 并未公开。有两个接口来定义如何处理窗口。
TriggerPolicy决定何时将窗口传递给bolt(例如“在缓冲100 个元组时传递”)。
EvictionPolicy决定何时从当前窗口驱逐元组(例如,“一旦元组在窗口中的最新元组后面超过 500 个元组,就丢弃元组”)。
您可以通过例如BaseWindowedBolt.withWindowLength间接配置这些策略,它在内部只是设置一些配置属性。这些属性用于确定WindowedBoltExecutor中的触发/驱逐策略。
我认为需要的是允许用户提供他们自己的自定义 TriggerPolicy/EvictionPolicy,或者添加一个新的 Trigger/EvictionPolicy 来满足你的需求。
如果您想为此提出问题,可以在https://issues.apache.org/jira/projects/STORM提出。如果你想贡献代码,源代码在https://github.com/apache/storm,你也可以在这里提出 PR。
推荐阅读
- azure - 如何删除天蓝色工件符号包
- python - 运行 Celery 任务锁定 Flask
- python - 如何根据其他列的某些相等性向CSV添加新列?
- javascript - 从内存中删除图像对象
- angular - 如何使用打字稿和角度在动态下拉列表中创建“子菜单”?
- sql - SQL SERVER - 触发器限制超过 3 条记录
- xamarin.forms - Xamarin Google Places 位置
- notepad++ - 热门删除/替换标签但保留内容?
- visual-studio - 有没有办法在 Visual Studio 中评论选定的行而不是选定的文本范围?
- flutter - dart 中的 DateFormat 错误:“尝试从 17-04-2020 的位置 10 读取 -YYYY”