首页 > 解决方案 > 多微批风暴拓扑

问题描述

首先,如果我的问题重复,我深表歉意,我尝试搜索但找不到与我的问题相关的答案

首先真诚的道歉,如果我问一些非常基本的问题,因为我是 Storm 的初学者。而且如果我的问题是重复的,当我尝试搜索但找不到我的问题的相关答案时

请就我的以下用例提供建议。

我的用例:

我有一个 Spout 从一个内部消息传递机制读取数据,因为它以非常高的频率(100 秒/秒)接收和发送元组。

现在除了数据之外,每个元组还有一个频率(int)(因为总共可以有 4-5 种频率)。

现在我需要设计一个 Bolt 来批处理/汇集所有元组,并且只按频率定期发出,具有只发出最新元组的功能(如果在下一批之前收到重复),因为我们在元组数据中有一个基于字符串的键识别重复项。

例如

  1. 因此,所有以 25 秒为频率的元组将被汇集在一起​​,并由 Bolt 每 25 秒发出一次(如果在 25 秒内收到重复的元组,则只会考虑最新的一个)。

  2. 与所有 10 分钟的元组类似,因为频率将汇集在一起​​,并由 Bolt 每隔 10 分钟发出一次(如果在 10 分钟内收到重复的元组,则只会考虑最新的一个)。

** 现在,由于我们可以有 4-5 种类型的频率(例如 10 秒、25 秒、10 分钟、20 分钟等,这些都是配置的),并且每个元组都应该被组合成适当的批次并发出(如上面的例子)。

供参考。对于 Bolt 分组,我使用了“fieldsGrouping”,如下配置。

*.fieldsGrouping("FILTERING_BOLT",new Fields(PUBLISHING_FREQUENCY));*

请提供帮助或建议,对于我的用例来说,最好的方法是什么,因为只是想不出实现任何东西来处理并发元组的流动和管理 Storm 的内部并行性。

标签: apache-storm

解决方案


听起来你想要窗口螺栓https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html。可能你想要一个翻滚窗口(即窗口间隔之间没有重叠)

Windowing bolts 让你设置它们应该发射窗口的间隔(例如每10 秒),然后在调用你提供的execute 方法之前,bolt 将缓冲它在前10 秒接收到的所有元组。

我认为您想要的结构类似于例如

spout -> splitter -> 5 second window bolt
                  -> 10 second window bolt

拆分器应该接收元组,检查频率场并将元组发送到右侧窗口螺栓。您可以通过为每种频率类型声明一个流来做到这一点。

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare("5-sec-stream", ...);
    declarer.declare("10-sec-stream", ...);
}

public void execute(Tuple input) {
    if (frequencyIsFive(input)) {
        collector.emit("5-sec-stream", new Values(input.getValues()))
    }
    //more cases here
}

然后当你声明你的拓扑时

topologyBuilder.setBolt("splitter", new SplitterBolt())
     .shuffleGrouping("spout")

topologyBuilder.setBolt("5-second-window", new YourWindowingBolt())
     .globalGrouping("splitter", "5-sec-stream")

使所有 5 秒元组转到 5 秒窗口螺栓。

有关这方面的更多信息,特别是有关流和分组的部分,请参阅https://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html 。

在https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java有一个简单的窗口拓扑示例。

您可能需要注意的一件事是 Storm 的元组超时。如果您需要一个例如 10 分钟的窗口,您需要将元组超时从默认的 30 秒大幅提高,这样元组在队列中等待时不会超时。您可以通过设置来做到这一点,例如conf.setMessageTimeoutSecs(15*60)在配置拓扑时。您希望在窗口间隔和元组超时之间有一点余地,因为您希望尽可能避免元组超时。


推荐阅读